001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.List; 024import java.util.TreeSet; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ArrayBackedTag; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.ExtendedCell; 029import org.apache.hadoop.hbase.KeyValue; 030import org.apache.hadoop.hbase.KeyValueUtil; 031import org.apache.hadoop.hbase.Tag; 032import org.apache.hadoop.hbase.TagType; 033import org.apache.hadoop.hbase.TagUtil; 034import org.apache.hadoop.hbase.client.ClientInternalHelper; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.security.visibility.CellVisibility; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.mapreduce.Reducer; 041import org.apache.hadoop.util.StringUtils; 042import org.apache.yetus.audience.InterfaceAudience; 043 044/** 045 * Emits sorted Puts. Reads in all Puts from passed Iterator, sorts them, then emits Puts in sorted 046 * order. If lots of columns per row, it will use lots of memory sorting. 047 * @see HFileOutputFormat2 048 * @see CellSortReducer 049 */ 050@InterfaceAudience.Public 051public class PutSortReducer 052 extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> { 053 // the cell creator 054 private CellCreator kvCreator; 055 056 @Override 057 protected void 058 setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context) 059 throws IOException, InterruptedException { 060 Configuration conf = context.getConfiguration(); 061 this.kvCreator = new CellCreator(conf); 062 } 063 064 @Override 065 protected void reduce(ImmutableBytesWritable row, Iterable<Put> puts, 066 Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context) 067 throws IOException, InterruptedException { 068 // although reduce() is called per-row, handle pathological case 069 long threshold = 070 context.getConfiguration().getLong("putsortreducer.row.threshold", 1L * (1 << 30)); 071 Iterator<Put> iter = puts.iterator(); 072 while (iter.hasNext()) { 073 TreeSet<KeyValue> map = new TreeSet<>(CellComparator.getInstance()); 074 long curSize = 0; 075 // stop at the end or the RAM threshold 076 List<Tag> tags = new ArrayList<>(); 077 while (iter.hasNext() && curSize < threshold) { 078 // clear the tags 079 tags.clear(); 080 Put p = iter.next(); 081 long t = p.getTTL(); 082 if (t != Long.MAX_VALUE) { 083 // add TTL tag if found 084 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); 085 } 086 byte[] acl = p.getACL(); 087 if (acl != null) { 088 // add ACL tag if found 089 tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl)); 090 } 091 try { 092 CellVisibility cellVisibility = p.getCellVisibility(); 093 if (cellVisibility != null) { 094 // add the visibility labels if any 095 tags.addAll(kvCreator.getVisibilityExpressionResolver() 096 .createVisibilityExpTags(cellVisibility.getExpression())); 097 } 098 } catch (DeserializationException e) { 099 // We just throw exception here. Should we allow other mutations to proceed by 100 // just ignoring the bad one? 101 throw new IOException("Invalid visibility expression found in mutation " + p, e); 102 } 103 for (List<ExtendedCell> cells : ClientInternalHelper.getExtendedFamilyCellMap(p).values()) { 104 for (ExtendedCell cell : cells) { 105 // Creating the KV which needs to be directly written to HFiles. Using the Facade 106 // KVCreator for creation of kvs. 107 KeyValue kv = null; 108 TagUtil.carryForwardTags(tags, cell); 109 if (!tags.isEmpty()) { 110 kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), 111 cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), 112 cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 113 cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), 114 cell.getValueOffset(), cell.getValueLength(), tags); 115 } else { 116 kv = KeyValueUtil.ensureKeyValue(cell); 117 } 118 if (map.add(kv)) {// don't count duplicated kv into size 119 curSize += kv.heapSize(); 120 } 121 } 122 } 123 } 124 context.setStatus("Read " + map.size() + " entries of " + map.getClass() + "(" 125 + StringUtils.humanReadableInt(curSize) + ")"); 126 int index = 0; 127 for (KeyValue kv : map) { 128 context.write(row, kv); 129 if (++index % 100 == 0) context.setStatus("Wrote " + index); 130 } 131 132 // if we have more entries to process 133 if (iter.hasNext()) { 134 // force flush because we cannot guarantee intra-row sorted order 135 context.write(null, null); 136 } 137 } 138 } 139}