001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Map;
023import java.util.Map.Entry;
024import org.apache.hadoop.hbase.ExtendedCell;
025import org.apache.hadoop.hbase.client.ClientInternalHelper;
026import org.apache.hadoop.hbase.client.Put;
027import org.apache.hadoop.mapreduce.Reducer;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Combine Puts. Merges Put instances grouped by <code>K</code> into a single instance.
034 * @see TableMapReduceUtil
035 */
036@InterfaceAudience.Public
037public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
038  private static final Logger LOG = LoggerFactory.getLogger(PutCombiner.class);
039
040  @Override
041  protected void reduce(K row, Iterable<Put> vals, Context context)
042    throws IOException, InterruptedException {
043    // Using HeapSize to create an upper bound on the memory size of
044    // the puts and flush some portion of the content while looping. This
045    // flush could result in multiple Puts for a single rowkey. That is
046    // acceptable because Combiner is run as an optimization and it's not
047    // critical that all Puts are grouped perfectly.
048    long threshold =
049      context.getConfiguration().getLong("putcombiner.row.threshold", 1L * (1 << 30));
050    int cnt = 0;
051    long curSize = 0;
052    Put combinedPut = null;
053    Map<byte[], List<ExtendedCell>> combinedFamilyMap = null;
054    for (Put p : vals) {
055      cnt++;
056      if (combinedPut == null) {
057        combinedPut = p;
058        combinedFamilyMap = ClientInternalHelper.getExtendedFamilyCellMap(combinedPut);
059      } else {
060        for (Entry<byte[], List<ExtendedCell>> entry : ClientInternalHelper
061          .getExtendedFamilyCellMap(p).entrySet()) {
062          List<ExtendedCell> existCells = combinedFamilyMap.get(entry.getKey());
063          if (existCells == null) {
064            // no cells for this family yet, just put it
065            combinedFamilyMap.put(entry.getKey(), entry.getValue());
066            // do not forget to calculate the size
067            for (ExtendedCell cell : entry.getValue()) {
068              curSize += cell.heapSize();
069            }
070          } else {
071            // otherwise just add the cells to the existent list for this family
072            for (ExtendedCell cell : entry.getValue()) {
073              existCells.add(cell);
074              curSize += cell.heapSize();
075            }
076          }
077        }
078        if (cnt % 10 == 0) {
079          context.setStatus("Combine " + cnt);
080        }
081        if (curSize > threshold) {
082          if (LOG.isDebugEnabled()) {
083            LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
084          }
085          context.write(row, combinedPut);
086          combinedPut = null;
087          curSize = 0;
088          cnt = 0;
089        }
090      }
091    }
092    if (combinedPut != null) {
093      if (LOG.isDebugEnabled()) {
094        LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
095      }
096      context.write(row, combinedPut);
097    }
098  }
099}