001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import java.util.TreeSet;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ArrayBackedTag;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.ExtendedCell;
029import org.apache.hadoop.hbase.KeyValue;
030import org.apache.hadoop.hbase.KeyValueUtil;
031import org.apache.hadoop.hbase.Tag;
032import org.apache.hadoop.hbase.TagType;
033import org.apache.hadoop.hbase.TagUtil;
034import org.apache.hadoop.hbase.client.ClientInternalHelper;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.security.visibility.CellVisibility;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.mapreduce.Reducer;
041import org.apache.hadoop.util.StringUtils;
042import org.apache.yetus.audience.InterfaceAudience;
043
044/**
045 * Emits sorted Puts. Reads in all Puts from passed Iterator, sorts them, then emits Puts in sorted
046 * order. If lots of columns per row, it will use lots of memory sorting.
047 * @see HFileOutputFormat2
048 * @see CellSortReducer
049 */
050@InterfaceAudience.Public
051public class PutSortReducer
052  extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
053  // the cell creator
054  private CellCreator kvCreator;
055
056  @Override
057  protected void
058    setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
059      throws IOException, InterruptedException {
060    Configuration conf = context.getConfiguration();
061    this.kvCreator = new CellCreator(conf);
062  }
063
064  @Override
065  protected void reduce(ImmutableBytesWritable row, Iterable<Put> puts,
066    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
067    throws IOException, InterruptedException {
068    // although reduce() is called per-row, handle pathological case
069    long threshold =
070      context.getConfiguration().getLong("putsortreducer.row.threshold", 1L * (1 << 30));
071    Iterator<Put> iter = puts.iterator();
072    while (iter.hasNext()) {
073      TreeSet<KeyValue> map = new TreeSet<>(CellComparator.getInstance());
074      long curSize = 0;
075      // stop at the end or the RAM threshold
076      List<Tag> tags = new ArrayList<>();
077      while (iter.hasNext() && curSize < threshold) {
078        // clear the tags
079        tags.clear();
080        Put p = iter.next();
081        long t = p.getTTL();
082        if (t != Long.MAX_VALUE) {
083          // add TTL tag if found
084          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
085        }
086        byte[] acl = p.getACL();
087        if (acl != null) {
088          // add ACL tag if found
089          tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl));
090        }
091        try {
092          CellVisibility cellVisibility = p.getCellVisibility();
093          if (cellVisibility != null) {
094            // add the visibility labels if any
095            tags.addAll(kvCreator.getVisibilityExpressionResolver()
096              .createVisibilityExpTags(cellVisibility.getExpression()));
097          }
098        } catch (DeserializationException e) {
099          // We just throw exception here. Should we allow other mutations to proceed by
100          // just ignoring the bad one?
101          throw new IOException("Invalid visibility expression found in mutation " + p, e);
102        }
103        for (List<ExtendedCell> cells : ClientInternalHelper.getExtendedFamilyCellMap(p).values()) {
104          for (ExtendedCell cell : cells) {
105            // Creating the KV which needs to be directly written to HFiles. Using the Facade
106            // KVCreator for creation of kvs.
107            KeyValue kv = null;
108            TagUtil.carryForwardTags(tags, cell);
109            if (!tags.isEmpty()) {
110              kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
111                cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
112                cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
113                cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
114                cell.getValueOffset(), cell.getValueLength(), tags);
115            } else {
116              kv = KeyValueUtil.ensureKeyValue(cell);
117            }
118            if (map.add(kv)) {// don't count duplicated kv into size
119              curSize += kv.heapSize();
120            }
121          }
122        }
123      }
124      context.setStatus("Read " + map.size() + " entries of " + map.getClass() + "("
125        + StringUtils.humanReadableInt(curSize) + ")");
126      int index = 0;
127      for (KeyValue kv : map) {
128        context.write(row, kv);
129        if (++index % 100 == 0) context.setStatus("Wrote " + index);
130      }
131
132      // if we have more entries to process
133      if (iter.hasNext()) {
134        // force flush because we cannot guarantee intra-row sorted order
135        context.write(null, null);
136      }
137    }
138  }
139}