001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.text.MessageFormat; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.hbase.HRegionInfo; 028import org.apache.hadoop.hbase.HRegionLocation; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.RegionLocator; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.Pair; 039import org.apache.hadoop.mapreduce.InputFormat; 040import org.apache.hadoop.mapreduce.InputSplit; 041import org.apache.hadoop.mapreduce.JobContext; 042import org.apache.hadoop.mapreduce.RecordReader; 043import org.apache.hadoop.mapreduce.TaskAttemptContext; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * A base for {@link MultiTableInputFormat}s. Receives a list of {@link Scan} instances that define 050 * the input tables and filters etc. Subclasses may use other TableRecordReader implementations. 051 */ 052@InterfaceAudience.Public 053public abstract class MultiTableInputFormatBase 054 extends InputFormat<ImmutableBytesWritable, Result> { 055 056 private static final Logger LOG = LoggerFactory.getLogger(MultiTableInputFormatBase.class); 057 058 /** Holds the set of scans used to define the input. */ 059 private List<Scan> scans; 060 061 /** The reader scanning the table, can be a custom one. */ 062 private TableRecordReader tableRecordReader = null; 063 064 /** 065 * Builds a TableRecordReader. If no TableRecordReader was provided, uses the default. 066 * @param split The split to work with. 067 * @param context The current context. 068 * @return The newly created record reader. 069 * @throws IOException When creating the reader fails. 070 * @throws InterruptedException when record reader initialization fails 071 * @see InputFormat#createRecordReader(InputSplit, TaskAttemptContext) 072 */ 073 @Override 074 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 075 TaskAttemptContext context) throws IOException, InterruptedException { 076 TableSplit tSplit = (TableSplit) split; 077 LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength())); 078 079 if (tSplit.getTable() == null) { 080 throw new IOException("Cannot create a record reader because of a" 081 + " previous error. Please look at the previous logs lines from" 082 + " the task's full log for more details."); 083 } 084 final Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); 085 Table table = connection.getTable(tSplit.getTable()); 086 087 if (this.tableRecordReader == null) { 088 this.tableRecordReader = new TableRecordReader(); 089 } 090 final TableRecordReader trr = this.tableRecordReader; 091 092 try { 093 Scan sc = tSplit.getScan(); 094 sc.setStartRow(tSplit.getStartRow()); 095 sc.setStopRow(tSplit.getEndRow()); 096 trr.setScan(sc); 097 trr.setTable(table); 098 return new RecordReader<ImmutableBytesWritable, Result>() { 099 100 @Override 101 public void close() throws IOException { 102 trr.close(); 103 connection.close(); 104 } 105 106 @Override 107 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 108 return trr.getCurrentKey(); 109 } 110 111 @Override 112 public Result getCurrentValue() throws IOException, InterruptedException { 113 return trr.getCurrentValue(); 114 } 115 116 @Override 117 public float getProgress() throws IOException, InterruptedException { 118 return trr.getProgress(); 119 } 120 121 @Override 122 public void initialize(InputSplit inputsplit, TaskAttemptContext context) 123 throws IOException, InterruptedException { 124 trr.initialize(inputsplit, context); 125 } 126 127 @Override 128 public boolean nextKeyValue() throws IOException, InterruptedException { 129 return trr.nextKeyValue(); 130 } 131 }; 132 } catch (IOException ioe) { 133 // If there is an exception make sure that all 134 // resources are closed and released. 135 trr.close(); 136 connection.close(); 137 throw ioe; 138 } 139 } 140 141 /** 142 * Calculates the splits that will serve as input for the map tasks. The number of splits matches 143 * the number of regions in a table. 144 * @param context The current job context. 145 * @return The list of input splits. 146 * @throws IOException When creating the list of splits fails. 147 * @see InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) 148 */ 149 @Override 150 public List<InputSplit> getSplits(JobContext context) throws IOException { 151 if (scans.isEmpty()) { 152 throw new IOException("No scans were provided."); 153 } 154 155 Map<TableName, List<Scan>> tableMaps = new HashMap<>(); 156 for (Scan scan : scans) { 157 byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); 158 if (tableNameBytes == null) throw new IOException("A scan object did not have a table name"); 159 160 TableName tableName = TableName.valueOf(tableNameBytes); 161 162 List<Scan> scanList = tableMaps.get(tableName); 163 if (scanList == null) { 164 scanList = new ArrayList<>(); 165 tableMaps.put(tableName, scanList); 166 } 167 scanList.add(scan); 168 } 169 170 List<InputSplit> splits = new ArrayList<>(); 171 Iterator iter = tableMaps.entrySet().iterator(); 172 // Make a single Connection to the Cluster and use it across all tables. 173 try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) { 174 while (iter.hasNext()) { 175 Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); 176 TableName tableName = entry.getKey(); 177 List<Scan> scanList = entry.getValue(); 178 try (Table table = conn.getTable(tableName); 179 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 180 RegionSizeCalculator sizeCalculator = 181 new RegionSizeCalculator(regionLocator, conn.getAdmin()); 182 Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); 183 for (Scan scan : scanList) { 184 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { 185 throw new IOException( 186 "Expecting at least one region for table : " + tableName.getNameAsString()); 187 } 188 int count = 0; 189 190 byte[] startRow = scan.getStartRow(); 191 byte[] stopRow = scan.getStopRow(); 192 193 for (int i = 0; i < keys.getFirst().length; i++) { 194 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { 195 continue; 196 } 197 198 if ( 199 (startRow.length == 0 || keys.getSecond()[i].length == 0 200 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) 201 && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0) 202 ) { 203 byte[] splitStart = 204 startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 205 ? keys.getFirst()[i] 206 : startRow; 207 byte[] splitStop = 208 (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) 209 && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; 210 211 HRegionLocation hregionLocation = 212 regionLocator.getRegionLocation(keys.getFirst()[i], false); 213 String regionHostname = hregionLocation.getHostname(); 214 HRegionInfo regionInfo = hregionLocation.getRegionInfo(); 215 String encodedRegionName = regionInfo.getEncodedName(); 216 long regionSize = sizeCalculator.getRegionSize(regionInfo.getRegionName()); 217 218 TableSplit split = new TableSplit(table.getName(), scan, splitStart, splitStop, 219 regionHostname, encodedRegionName, regionSize); 220 221 splits.add(split); 222 223 if (LOG.isDebugEnabled()) { 224 LOG.debug("getSplits: split -> " + (count++) + " -> " + split); 225 } 226 } 227 } 228 } 229 } 230 } 231 } 232 233 return splits; 234 } 235 236 /** 237 * Test if the given region is to be included in the InputSplit while splitting the regions of a 238 * table. 239 * <p> 240 * This optimization is effective when there is a specific reasoning to exclude an entire region 241 * from the M-R job, (and hence, not contributing to the InputSplit), given the start and end keys 242 * of the same. <br> 243 * Useful when we need to remember the last-processed top record and revisit the [last, current) 244 * interval for M-R processing, continuously. In addition to reducing InputSplits, reduces the 245 * load on the region server as well, due to the ordering of the keys. <br> 246 * <br> 247 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. 248 * <br> 249 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no 250 * region is excluded( i.e. all regions are included). 251 * @param startKey Start key of the region 252 * @param endKey End key of the region 253 * @return true, if this region needs to be included as part of the input (default). 254 */ 255 protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { 256 return true; 257 } 258 259 /** 260 * Allows subclasses to get the list of {@link Scan} objects. 261 */ 262 protected List<Scan> getScans() { 263 return this.scans; 264 } 265 266 /** 267 * Allows subclasses to set the list of {@link Scan} objects. 268 * @param scans The list of {@link Scan} used to define the input 269 */ 270 protected void setScans(List<Scan> scans) { 271 this.scans = scans; 272 } 273 274 /** 275 * Allows subclasses to set the {@link TableRecordReader}. 276 * @param tableRecordReader A different {@link TableRecordReader} implementation. 277 */ 278 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 279 this.tableRecordReader = tableRecordReader; 280 } 281}