001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.net.InetAddress; 023import java.net.InetSocketAddress; 024import java.net.UnknownHostException; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.RegionLocator; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Addressing; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.Pair; 042import org.apache.hadoop.hbase.util.Strings; 043import org.apache.hadoop.mapreduce.InputFormat; 044import org.apache.hadoop.mapreduce.InputSplit; 045import org.apache.hadoop.mapreduce.JobContext; 046import org.apache.hadoop.mapreduce.RecordReader; 047import org.apache.hadoop.mapreduce.TaskAttemptContext; 048import org.apache.hadoop.net.DNS; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, an 055 * {@link Scan} instance that defines the input columns etc. Subclasses may use other 056 * TableRecordReader implementations. Subclasses MUST ensure initializeTable(Connection, TableName) 057 * is called for an instance to function properly. Each of the entry points to this class used by 058 * the MapReduce framework, {@link #createRecordReader(InputSplit, TaskAttemptContext)} and 059 * {@link #getSplits(JobContext)}, will call {@link #initialize(JobContext)} as a convenient 060 * centralized location to handle retrieving the necessary configuration information. If your 061 * subclass overrides either of these methods, either call the parent version or call initialize 062 * yourself. 063 * <p> 064 * An example of a subclass: 065 * 066 * <pre> 067 * class ExampleTIF extends TableInputFormatBase { 068 * 069 * {@literal @}Override 070 * protected void initialize(JobContext context) throws IOException { 071 * // We are responsible for the lifecycle of this connection until we hand it over in 072 * // initializeTable. 073 * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( 074 * job.getConfiguration())); 075 * TableName tableName = TableName.valueOf("exampleTable"); 076 * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. 077 * initializeTable(connection, tableName); 078 * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), 079 * Bytes.toBytes("columnB") }; 080 * // optional, by default we'll get everything for the table. 081 * Scan scan = new Scan(); 082 * for (byte[] family : inputColumns) { 083 * scan.addFamily(family); 084 * } 085 * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); 086 * scan.setFilter(exampleFilter); 087 * setScan(scan); 088 * } 089 * } 090 * </pre> 091 * 092 * The number of InputSplits(mappers) match the number of regions in a table by default. Set 093 * "hbase.mapreduce.tableinput.mappers.per.region" to specify how many mappers per region, set this 094 * property will disable autobalance below.\ Set "hbase.mapreduce.tif.input.autobalance" to enable 095 * autobalance, hbase will assign mappers based on average region size; For regions, whose size 096 * larger than average region size may assigned more mappers, and for smaller one, they may group 097 * together to use one mapper. If actual average region size is too big, like 50G, it is not good to 098 * only assign 1 mapper for those large regions. Use "hbase.mapreduce.tif.ave.regionsize" to set max 099 * average region size when enable "autobalanece", default mas average region size is 8G. 100 */ 101@InterfaceAudience.Public 102public abstract class TableInputFormatBase extends InputFormat<ImmutableBytesWritable, Result> { 103 104 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormatBase.class); 105 106 private static final String NOT_INITIALIZED = "The input format instance has not been properly " 107 + "initialized. Ensure you call initializeTable either in your constructor or initialize " 108 + "method"; 109 private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" 110 + " previous error. Please look at the previous logs lines from" 111 + " the task's full log for more details."; 112 113 /** Specify if we enable auto-balance to set number of mappers in M/R jobs. */ 114 public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.tif.input.autobalance"; 115 /** 116 * In auto-balance, we split input by ave region size, if calculated region size is too big, we 117 * can set it. 118 */ 119 public static final String MAX_AVERAGE_REGION_SIZE = "hbase.mapreduce.tif.ave.regionsize"; 120 121 /** Set the number of Mappers for each region, all regions have same number of Mappers */ 122 public static final String NUM_MAPPERS_PER_REGION = 123 "hbase.mapreduce.tableinput.mappers.per.region"; 124 125 /** 126 * Holds the details for the internal scanner. 127 * @see Scan 128 */ 129 private Scan scan = null; 130 /** The {@link Admin}. */ 131 private Admin admin; 132 /** The {@link Table} to scan. */ 133 private Table table; 134 /** The {@link RegionLocator} of the table. */ 135 private RegionLocator regionLocator; 136 /** The reader scanning the table, can be a custom one. */ 137 private TableRecordReader tableRecordReader = null; 138 /** The underlying {@link Connection} of the table. */ 139 private Connection connection; 140 /** Used to generate splits based on region size. */ 141 private RegionSizeCalculator regionSizeCalculator; 142 143 /** The reverse DNS lookup cache mapping: IPAddress => HostName */ 144 private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>(); 145 146 /** 147 * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses the 148 * default. 149 * @param split The split to work with. 150 * @param context The current context. 151 * @return The newly created record reader. 152 * @throws IOException When creating the reader fails. 153 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( 154 * org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) 155 */ 156 @Override 157 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 158 TaskAttemptContext context) throws IOException { 159 // Just in case a subclass is relying on JobConfigurable magic. 160 if (table == null) { 161 initialize(context); 162 } 163 // null check in case our child overrides getTable to not throw. 164 try { 165 if (getTable() == null) { 166 // initialize() must not have been implemented in the subclass. 167 throw new IOException(INITIALIZATION_ERROR); 168 } 169 } catch (IllegalStateException exception) { 170 throw new IOException(INITIALIZATION_ERROR, exception); 171 } 172 TableSplit tSplit = (TableSplit) split; 173 LOG.info("Input split length: " + Strings.humanReadableInt(tSplit.getLength()) + " bytes."); 174 final TableRecordReader trr = 175 this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); 176 Scan sc = new Scan(this.scan); 177 sc.withStartRow(tSplit.getStartRow()); 178 sc.withStopRow(tSplit.getEndRow()); 179 trr.setScan(sc); 180 trr.setTable(getTable()); 181 return new RecordReader<ImmutableBytesWritable, Result>() { 182 183 @Override 184 public void close() throws IOException { 185 trr.close(); 186 closeTable(); 187 } 188 189 @Override 190 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 191 return trr.getCurrentKey(); 192 } 193 194 @Override 195 public Result getCurrentValue() throws IOException, InterruptedException { 196 return trr.getCurrentValue(); 197 } 198 199 @Override 200 public float getProgress() throws IOException, InterruptedException { 201 return trr.getProgress(); 202 } 203 204 @Override 205 public void initialize(InputSplit inputsplit, TaskAttemptContext context) 206 throws IOException, InterruptedException { 207 trr.initialize(inputsplit, context); 208 } 209 210 @Override 211 public boolean nextKeyValue() throws IOException, InterruptedException { 212 return trr.nextKeyValue(); 213 } 214 }; 215 } 216 217 protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { 218 return getRegionLocator().getStartEndKeys(); 219 } 220 221 /** 222 * Calculates the splits that will serve as input for the map tasks. 223 * @param context The current job context. 224 * @return The list of input splits. 225 * @throws IOException When creating the list of splits fails. 226 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( org.apache.hadoop.mapreduce.JobContext) 227 */ 228 @Override 229 public List<InputSplit> getSplits(JobContext context) throws IOException { 230 boolean closeOnFinish = false; 231 232 // Just in case a subclass is relying on JobConfigurable magic. 233 if (table == null) { 234 initialize(context); 235 closeOnFinish = true; 236 } 237 238 // null check in case our child overrides getTable to not throw. 239 try { 240 if (getTable() == null) { 241 // initialize() must not have been implemented in the subclass. 242 throw new IOException(INITIALIZATION_ERROR); 243 } 244 } catch (IllegalStateException exception) { 245 throw new IOException(INITIALIZATION_ERROR, exception); 246 } 247 248 try { 249 List<InputSplit> splits = oneInputSplitPerRegion(); 250 251 // set same number of mappers for each region 252 if (context.getConfiguration().get(NUM_MAPPERS_PER_REGION) != null) { 253 int nSplitsPerRegion = context.getConfiguration().getInt(NUM_MAPPERS_PER_REGION, 1); 254 List<InputSplit> res = new ArrayList<>(); 255 for (int i = 0; i < splits.size(); i++) { 256 List<InputSplit> tmp = createNInputSplitsUniform(splits.get(i), nSplitsPerRegion); 257 res.addAll(tmp); 258 } 259 return res; 260 } 261 262 // The default value of "hbase.mapreduce.input.autobalance" is false. 263 if (context.getConfiguration().getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false)) { 264 long maxAveRegionSize = 265 context.getConfiguration().getLong(MAX_AVERAGE_REGION_SIZE, 8L * 1073741824); // 8GB 266 return calculateAutoBalancedSplits(splits, maxAveRegionSize); 267 } 268 269 // return one mapper per region 270 return splits; 271 } finally { 272 if (closeOnFinish) { 273 closeTable(); 274 } 275 } 276 } 277 278 /** 279 * Create one InputSplit per region 280 * @return The list of InputSplit for all the regions 281 * @throws IOException throws IOException 282 */ 283 private List<InputSplit> oneInputSplitPerRegion() throws IOException { 284 if (regionSizeCalculator == null) { 285 // Initialize here rather than with the other resources because this involves 286 // a full scan of meta, which can be heavy. We might as well only do it if/when necessary. 287 regionSizeCalculator = createRegionSizeCalculator(getRegionLocator(), getAdmin()); 288 } 289 290 TableName tableName = getTable().getName(); 291 292 Pair<byte[][], byte[][]> keys = getStartEndKeys(); 293 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { 294 HRegionLocation regLoc = 295 getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); 296 if (null == regLoc) { 297 throw new IOException("Expecting at least one region."); 298 } 299 List<InputSplit> splits = new ArrayList<>(1); 300 long regionSize = regionSizeCalculator.getRegionSize(regLoc.getRegion().getRegionName()); 301 // In the table input format for single table we do not need to 302 // store the scan object in table split because it can be memory intensive and redundant 303 // information to what is already stored in conf SCAN. See HBASE-25212 304 TableSplit split = 305 new TableSplit(tableName, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, 306 regLoc.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); 307 splits.add(split); 308 return splits; 309 } 310 List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); 311 for (int i = 0; i < keys.getFirst().length; i++) { 312 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { 313 continue; 314 } 315 316 byte[] startRow = scan.getStartRow(); 317 byte[] stopRow = scan.getStopRow(); 318 // determine if the given start an stop key fall into the region 319 if ( 320 (startRow.length == 0 || keys.getSecond()[i].length == 0 321 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) 322 && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0) 323 ) { 324 byte[] splitStart = 325 startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 326 ? keys.getFirst()[i] 327 : startRow; 328 byte[] splitStop = 329 (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) 330 && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; 331 332 HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); 333 // The below InetSocketAddress creation does a name resolution. 334 InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); 335 if (isa.isUnresolved()) { 336 LOG.warn("Failed resolve " + isa); 337 } 338 InetAddress regionAddress = isa.getAddress(); 339 String regionLocation; 340 regionLocation = reverseDNS(regionAddress); 341 342 byte[] regionName = location.getRegion().getRegionName(); 343 String encodedRegionName = location.getRegion().getEncodedName(); 344 long regionSize = regionSizeCalculator.getRegionSize(regionName); 345 // In the table input format for single table we do not need to 346 // store the scan object in table split because it can be memory intensive and redundant 347 // information to what is already stored in conf SCAN. See HBASE-25212 348 TableSplit split = new TableSplit(tableName, null, splitStart, splitStop, regionLocation, 349 encodedRegionName, regionSize); 350 splits.add(split); 351 if (LOG.isDebugEnabled()) { 352 LOG.debug("getSplits: split -> " + i + " -> " + split); 353 } 354 } 355 } 356 return splits; 357 } 358 359 /** 360 * Create n splits for one InputSplit, For now only support uniform distribution 361 * @param split A TableSplit corresponding to a range of rowkeys 362 * @param n Number of ranges after splitting. Pass 1 means no split for the range Pass 2 if 363 * you want to split the range in two; 364 * @return A list of TableSplit, the size of the list is {@code n} 365 */ 366 protected List<InputSplit> createNInputSplitsUniform(InputSplit split, int n) 367 throws IllegalArgumentIOException { 368 if (split == null || !(split instanceof TableSplit)) { 369 throw new IllegalArgumentIOException( 370 "InputSplit for CreateNSplitsPerRegion can not be null + " 371 + "and should be instance of TableSplit"); 372 } 373 // if n < 1, then still continue using n = 1 374 n = n < 1 ? 1 : n; 375 List<InputSplit> res = new ArrayList<>(n); 376 if (n == 1) { 377 res.add(split); 378 return res; 379 } 380 381 // Collect Region related information 382 TableSplit ts = (TableSplit) split; 383 TableName tableName = ts.getTable(); 384 String regionLocation = ts.getRegionLocation(); 385 String encodedRegionName = ts.getEncodedRegionName(); 386 long regionSize = ts.getLength(); 387 byte[] startRow = ts.getStartRow(); 388 byte[] endRow = ts.getEndRow(); 389 390 // For special case: startRow or endRow is empty 391 if (startRow.length == 0 && endRow.length == 0) { 392 startRow = new byte[1]; 393 endRow = new byte[1]; 394 startRow[0] = 0; 395 endRow[0] = -1; 396 } 397 if (startRow.length == 0 && endRow.length != 0) { 398 startRow = new byte[1]; 399 startRow[0] = 0; 400 } 401 if (startRow.length != 0 && endRow.length == 0) { 402 endRow = new byte[startRow.length]; 403 for (int k = 0; k < startRow.length; k++) { 404 endRow[k] = -1; 405 } 406 } 407 408 // Split Region into n chunks evenly 409 byte[][] splitKeys = Bytes.split(startRow, endRow, true, n - 1); 410 for (int i = 0; i < splitKeys.length - 1; i++) { 411 // In the table input format for single table we do not need to 412 // store the scan object in table split because it can be memory intensive and redundant 413 // information to what is already stored in conf SCAN. See HBASE-25212 414 // notice that the regionSize parameter may be not very accurate 415 TableSplit tsplit = new TableSplit(tableName, null, splitKeys[i], splitKeys[i + 1], 416 regionLocation, encodedRegionName, regionSize / n); 417 res.add(tsplit); 418 } 419 return res; 420 } 421 422 /** 423 * Calculates the number of MapReduce input splits for the map tasks. The number of MapReduce 424 * input splits depends on the average region size. Make it 'public' for testing 425 * @param splits The list of input splits before balance. 426 * @param maxAverageRegionSize max Average region size for one mapper 427 * @return The list of input splits. 428 * @throws IOException When creating the list of splits fails. 429 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( org.apache.hadoop.mapreduce.JobContext) 430 */ 431 public List<InputSplit> calculateAutoBalancedSplits(List<InputSplit> splits, 432 long maxAverageRegionSize) throws IOException { 433 if (splits.size() == 0) { 434 return splits; 435 } 436 List<InputSplit> resultList = new ArrayList<>(); 437 long totalRegionSize = 0; 438 for (int i = 0; i < splits.size(); i++) { 439 TableSplit ts = (TableSplit) splits.get(i); 440 totalRegionSize += ts.getLength(); 441 } 442 long averageRegionSize = totalRegionSize / splits.size(); 443 // totalRegionSize might be overflow, and the averageRegionSize must be positive. 444 if (averageRegionSize <= 0) { 445 LOG.warn("The averageRegionSize is not positive: " + averageRegionSize + ", " 446 + "set it to Long.MAX_VALUE " + splits.size()); 447 averageRegionSize = Long.MAX_VALUE / splits.size(); 448 } 449 // if averageRegionSize is too big, change it to default as 1 GB, 450 if (averageRegionSize > maxAverageRegionSize) { 451 averageRegionSize = maxAverageRegionSize; 452 } 453 // if averageRegionSize is too small, we do not need to allocate more mappers for those 'large' 454 // region 455 // set default as 16M = (default hdfs block size) / 4; 456 if (averageRegionSize < 16 * 1048576) { 457 return splits; 458 } 459 for (int i = 0; i < splits.size(); i++) { 460 TableSplit ts = (TableSplit) splits.get(i); 461 TableName tableName = ts.getTable(); 462 String regionLocation = ts.getRegionLocation(); 463 String encodedRegionName = ts.getEncodedRegionName(); 464 long regionSize = ts.getLength(); 465 466 if (regionSize >= averageRegionSize) { 467 // make this region as multiple MapReduce input split. 468 int n = 469 (int) Math.round(Math.log(((double) regionSize) / ((double) averageRegionSize)) + 1.0); 470 List<InputSplit> temp = createNInputSplitsUniform(ts, n); 471 resultList.addAll(temp); 472 } else { 473 // if the total size of several small continuous regions less than the average region size, 474 // combine them into one MapReduce input split. 475 long totalSize = regionSize; 476 byte[] splitStartKey = ts.getStartRow(); 477 byte[] splitEndKey = ts.getEndRow(); 478 int j = i + 1; 479 while (j < splits.size()) { 480 TableSplit nextRegion = (TableSplit) splits.get(j); 481 long nextRegionSize = nextRegion.getLength(); 482 if ( 483 totalSize + nextRegionSize <= averageRegionSize 484 && Bytes.equals(splitEndKey, nextRegion.getStartRow()) 485 ) { 486 totalSize = totalSize + nextRegionSize; 487 splitEndKey = nextRegion.getEndRow(); 488 j++; 489 } else { 490 break; 491 } 492 } 493 i = j - 1; 494 // In the table input format for single table we do not need to 495 // store the scan object in table split because it can be memory intensive and redundant 496 // information to what is already stored in conf SCAN. See HBASE-25212 497 TableSplit t = new TableSplit(tableName, null, splitStartKey, splitEndKey, regionLocation, 498 encodedRegionName, totalSize); 499 resultList.add(t); 500 } 501 } 502 return resultList; 503 } 504 505 String reverseDNS(InetAddress ipAddress) throws UnknownHostException { 506 String hostName = this.reverseDNSCacheMap.get(ipAddress); 507 if (hostName == null) { 508 String ipAddressString = null; 509 try { 510 ipAddressString = DNS.reverseDns(ipAddress, null); 511 } catch (Exception e) { 512 // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the 513 // name service. Also, in case of ipv6, we need to use the InetAddress since resolving 514 // reverse DNS using jndi doesn't work well with ipv6 addresses. 515 ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); 516 } 517 if (ipAddressString == null) { 518 throw new UnknownHostException("No host found for " + ipAddress); 519 } 520 hostName = Strings.domainNamePointerToHostName(ipAddressString); 521 this.reverseDNSCacheMap.put(ipAddress, hostName); 522 } 523 return hostName; 524 } 525 526 /** 527 * Test if the given region is to be included in the InputSplit while splitting the regions of a 528 * table. 529 * <p> 530 * This optimization is effective when there is a specific reasoning to exclude an entire region 531 * from the M-R job, (and hence, not contributing to the InputSplit), given the start and end keys 532 * of the same. <br> 533 * Useful when we need to remember the last-processed top record and revisit the [last, current) 534 * interval for M-R processing, continuously. In addition to reducing InputSplits, reduces the 535 * load on the region server as well, due to the ordering of the keys. <br> 536 * <br> 537 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. 538 * <br> 539 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no 540 * region is excluded( i.e. all regions are included). 541 * @param startKey Start key of the region 542 * @param endKey End key of the region 543 * @return true, if this region needs to be included as part of the input (default). 544 */ 545 protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { 546 return true; 547 } 548 549 /** 550 * Allows subclasses to get the {@link RegionLocator}. 551 */ 552 protected RegionLocator getRegionLocator() { 553 if (regionLocator == null) { 554 throw new IllegalStateException(NOT_INITIALIZED); 555 } 556 return regionLocator; 557 } 558 559 /** 560 * Allows subclasses to get the {@link Table}. 561 */ 562 protected Table getTable() { 563 if (table == null) { 564 throw new IllegalStateException(NOT_INITIALIZED); 565 } 566 return table; 567 } 568 569 /** 570 * Allows subclasses to get the {@link Admin}. 571 */ 572 protected Admin getAdmin() { 573 if (admin == null) { 574 throw new IllegalStateException(NOT_INITIALIZED); 575 } 576 return admin; 577 } 578 579 /** 580 * Allows subclasses to initialize the table information. 581 * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. 582 * @param tableName The {@link TableName} of the table to process. 583 */ 584 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 585 if (this.table != null || this.connection != null) { 586 LOG.warn("initializeTable called multiple times. Overwriting connection and table " 587 + "reference; TableInputFormatBase will not close these old references when done."); 588 } 589 this.table = connection.getTable(tableName); 590 this.regionLocator = connection.getRegionLocator(tableName); 591 this.admin = connection.getAdmin(); 592 this.connection = connection; 593 this.regionSizeCalculator = null; 594 } 595 596 @InterfaceAudience.Private 597 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 598 throws IOException { 599 return new RegionSizeCalculator(locator, admin); 600 } 601 602 /** 603 * Gets the scan defining the actual details like columns etc. 604 * @return The internal scan instance. 605 */ 606 public Scan getScan() { 607 if (this.scan == null) this.scan = new Scan(); 608 return scan; 609 } 610 611 /** 612 * Sets the scan defining the actual details like columns etc. 613 * @param scan The scan to set. 614 */ 615 public void setScan(Scan scan) { 616 this.scan = scan; 617 } 618 619 /** 620 * Allows subclasses to set the {@link TableRecordReader}. 621 * @param tableRecordReader A different {@link TableRecordReader} implementation. 622 */ 623 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 624 this.tableRecordReader = tableRecordReader; 625 } 626 627 /** 628 * Handle subclass specific set up. Each of the entry points used by the MapReduce framework, 629 * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, 630 * will call {@link #initialize(JobContext)} as a convenient centralized location to handle 631 * retrieving the necessary configuration information and calling 632 * {@link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize 633 * call such that it is safe to call multiple times. The current TableInputFormatBase 634 * implementation relies on a non-null table reference to decide if an initialize call is needed, 635 * but this behavior may change in the future. In particular, it is critical that initializeTable 636 * not be called multiple times since this will leak Connection instances. 637 */ 638 protected void initialize(JobContext context) throws IOException { 639 } 640 641 /** 642 * Close the Table and related objects that were initialized via 643 * {@link #initializeTable(Connection, TableName)}. 644 */ 645 protected void closeTable() throws IOException { 646 close(admin, table, regionLocator, connection); 647 admin = null; 648 table = null; 649 regionLocator = null; 650 connection = null; 651 regionSizeCalculator = null; 652 } 653 654 private void close(Closeable... closables) throws IOException { 655 for (Closeable c : closables) { 656 if (c != null) { 657 c.close(); 658 } 659 } 660 } 661 662}