001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import org.apache.hadoop.conf.Configurable; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.CellUtil; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.mapreduce.Job; 031import org.apache.yetus.audience.InterfaceAudience; 032 033/** 034 * Extract grouping columns from input record. 035 */ 036@InterfaceAudience.Public 037public class GroupingTableMapper extends TableMapper<ImmutableBytesWritable, Result> 038 implements Configurable { 039 040 /** 041 * JobConf parameter to specify the columns used to produce the key passed to collect from the map 042 * phase. 043 */ 044 public static final String GROUP_COLUMNS = "hbase.mapred.groupingtablemap.columns"; 045 046 /** The grouping columns. */ 047 protected byte[][] columns; 048 /** The current configuration. */ 049 private Configuration conf = null; 050 051 /** 052 * Use this before submitting a TableMap job. It will appropriately set up the job. 053 * @param table The table to be processed. 054 * @param scan The scan with the columns etc. 055 * @param groupColumns A space separated list of columns used to form the key used in collect. 056 * @param mapper The mapper class. 057 * @param job The current job. 058 * @throws IOException When setting up the job fails. 059 */ 060 @SuppressWarnings("unchecked") 061 public static void initJob(String table, Scan scan, String groupColumns, 062 Class<? extends TableMapper> mapper, Job job) throws IOException { 063 TableMapReduceUtil.initTableMapperJob(table, scan, mapper, ImmutableBytesWritable.class, 064 Result.class, job); 065 job.getConfiguration().set(GROUP_COLUMNS, groupColumns); 066 } 067 068 /** 069 * Extract the grouping columns from value to construct a new key. Pass the new key and value to 070 * reduce. If any of the grouping columns are not found in the value, the record is skipped. 071 * @param key The current key. 072 * @param value The current value. 073 * @param context The current context. 074 * @throws IOException When writing the record fails. 075 * @throws InterruptedException When the job is aborted. 076 */ 077 @Override 078 public void map(ImmutableBytesWritable key, Result value, Context context) 079 throws IOException, InterruptedException { 080 byte[][] keyVals = extractKeyValues(value); 081 if (keyVals != null) { 082 ImmutableBytesWritable tKey = createGroupKey(keyVals); 083 context.write(tKey, value); 084 } 085 } 086 087 /** 088 * Extract columns values from the current record. This method returns null if any of the columns 089 * are not found. 090 * <p> 091 * Override this method if you want to deal with nulls differently. 092 * @param r The current values. 093 * @return Array of byte values. 094 */ 095 protected byte[][] extractKeyValues(Result r) { 096 byte[][] keyVals = null; 097 ArrayList<byte[]> foundList = new ArrayList<>(); 098 int numCols = columns.length; 099 if (numCols > 0) { 100 for (Cell value : r.listCells()) { 101 byte[] column = 102 CellUtil.makeColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value)); 103 for (int i = 0; i < numCols; i++) { 104 if (Bytes.equals(column, columns[i])) { 105 foundList.add(CellUtil.cloneValue(value)); 106 break; 107 } 108 } 109 } 110 if (foundList.size() == numCols) { 111 keyVals = foundList.toArray(new byte[numCols][]); 112 } 113 } 114 return keyVals; 115 } 116 117 /** 118 * Create a key by concatenating multiple column values. 119 * <p> 120 * Override this function in order to produce different types of keys. 121 * @param vals The current key/values. 122 * @return A key generated by concatenating multiple column values. 123 */ 124 protected ImmutableBytesWritable createGroupKey(byte[][] vals) { 125 if (vals == null) { 126 return null; 127 } 128 StringBuilder sb = new StringBuilder(); 129 for (int i = 0; i < vals.length; i++) { 130 if (i > 0) { 131 sb.append(" "); 132 } 133 sb.append(Bytes.toString(vals[i])); 134 } 135 return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); 136 } 137 138 /** 139 * Returns the current configuration. 140 * @return The current configuration. 141 * @see org.apache.hadoop.conf.Configurable#getConf() 142 */ 143 @Override 144 public Configuration getConf() { 145 return conf; 146 } 147 148 /** 149 * Sets the configuration. This is used to set up the grouping details. 150 * @param configuration The configuration to set. 151 * @see org.apache.hadoop.conf.Configurable#setConf( org.apache.hadoop.conf.Configuration) 152 */ 153 @Override 154 public void setConf(Configuration configuration) { 155 this.conf = configuration; 156 String[] cols = conf.get(GROUP_COLUMNS, "").split(" "); 157 columns = new byte[cols.length][]; 158 for (int i = 0; i < cols.length; i++) { 159 columns[i] = Bytes.toBytes(cols[i]); 160 } 161 } 162 163}