001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import com.codahale.metrics.MetricRegistry; 021import java.io.File; 022import java.io.IOException; 023import java.net.URL; 024import java.net.URLDecoder; 025import java.util.ArrayList; 026import java.util.Base64; 027import java.util.Collection; 028import java.util.Enumeration; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.zip.ZipEntry; 035import java.util.zip.ZipFile; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionLocator; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.security.User; 049import org.apache.hadoop.hbase.security.UserProvider; 050import org.apache.hadoop.hbase.security.token.TokenUtil; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.RegionSplitter; 053import org.apache.hadoop.hbase.zookeeper.ZKConfig; 054import org.apache.hadoop.io.Writable; 055import org.apache.hadoop.mapreduce.InputFormat; 056import org.apache.hadoop.mapreduce.Job; 057import org.apache.hadoop.util.StringUtils; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 064 065/** 066 * Utility for {@link TableMapper} and {@link TableReducer} 067 */ 068@SuppressWarnings({ "rawtypes", "unchecked" }) 069@InterfaceAudience.Public 070public class TableMapReduceUtil { 071 private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class); 072 public static final String TABLE_INPUT_CLASS_KEY = "hbase.table.input.class"; 073 074 /** 075 * Use this before submitting a TableMap job. It will appropriately set up the job. 076 * @param table The table name to read from. 077 * @param scan The scan instance with the columns, time range etc. 078 * @param mapper The mapper class to use. 079 * @param outputKeyClass The class of the output key. 080 * @param outputValueClass The class of the output value. 081 * @param job The current job to adjust. Make sure the passed job is carrying all 082 * necessary HBase configuration. 083 * @throws IOException When setting up the details fails. 084 */ 085 public static void initTableMapperJob(String table, Scan scan, 086 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 087 Job job) throws IOException { 088 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, true); 089 } 090 091 /** 092 * Use this before submitting a TableMap job. It will appropriately set up the job. 093 * @param table The table name to read from. 094 * @param scan The scan instance with the columns, time range etc. 095 * @param mapper The mapper class to use. 096 * @param outputKeyClass The class of the output key. 097 * @param outputValueClass The class of the output value. 098 * @param job The current job to adjust. Make sure the passed job is carrying all 099 * necessary HBase configuration. 100 * @throws IOException When setting up the details fails. 101 */ 102 public static void initTableMapperJob(TableName table, Scan scan, 103 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 104 Job job) throws IOException { 105 initTableMapperJob(table.getNameAsString(), scan, mapper, outputKeyClass, outputValueClass, job, 106 true); 107 } 108 109 /** 110 * Use this before submitting a TableMap job. It will appropriately set up the job. 111 * @param table Binary representation of the table name to read from. 112 * @param scan The scan instance with the columns, time range etc. 113 * @param mapper The mapper class to use. 114 * @param outputKeyClass The class of the output key. 115 * @param outputValueClass The class of the output value. 116 * @param job The current job to adjust. Make sure the passed job is carrying all 117 * necessary HBase configuration. 118 * @throws IOException When setting up the details fails. 119 */ 120 public static void initTableMapperJob(byte[] table, Scan scan, 121 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 122 Job job) throws IOException { 123 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, 124 true); 125 } 126 127 /** 128 * Use this before submitting a TableMap job. It will appropriately set up the job. 129 * @param table The table name to read from. 130 * @param scan The scan instance with the columns, time range etc. 131 * @param mapper The mapper class to use. 132 * @param outputKeyClass The class of the output key. 133 * @param outputValueClass The class of the output value. 134 * @param job The current job to adjust. Make sure the passed job is carrying all 135 * necessary HBase configuration. 136 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 137 * the distributed cache (tmpjars). 138 * @throws IOException When setting up the details fails. 139 */ 140 public static void initTableMapperJob(String table, Scan scan, 141 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 142 Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) 143 throws IOException { 144 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, 145 addDependencyJars, true, inputFormatClass); 146 } 147 148 /** 149 * Use this before submitting a TableMap job. It will appropriately set up the job. 150 * @param table The table name to read from. 151 * @param scan The scan instance with the columns, time range etc. 152 * @param mapper The mapper class to use. 153 * @param outputKeyClass The class of the output key. 154 * @param outputValueClass The class of the output value. 155 * @param job The current job to adjust. Make sure the passed job is carrying all 156 * necessary HBase configuration. 157 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 158 * the distributed cache (tmpjars). 159 * @param initCredentials whether to initialize hbase auth credentials for the job 160 * @param inputFormatClass the input format 161 * @throws IOException When setting up the details fails. 162 */ 163 public static void initTableMapperJob(String table, Scan scan, 164 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 165 Job job, boolean addDependencyJars, boolean initCredentials, 166 Class<? extends InputFormat> inputFormatClass) throws IOException { 167 job.setInputFormatClass(inputFormatClass); 168 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); 169 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); 170 job.setMapperClass(mapper); 171 if (Put.class.equals(outputValueClass)) { 172 job.setCombinerClass(PutCombiner.class); 173 } 174 Configuration conf = job.getConfiguration(); 175 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 176 conf.set(TableInputFormat.INPUT_TABLE, table); 177 conf.set(TableInputFormat.SCAN, convertScanToString(scan)); 178 conf.setStrings("io.serializations", conf.get("io.serializations"), 179 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 180 CellSerialization.class.getName()); 181 if (addDependencyJars) { 182 addDependencyJars(job); 183 } 184 if (initCredentials) { 185 initCredentials(job); 186 } 187 } 188 189 /** 190 * Use this before submitting a TableMap job. It will appropriately set up the job. 191 * @param table Binary representation of the table name to read from. 192 * @param scan The scan instance with the columns, time range etc. 193 * @param mapper The mapper class to use. 194 * @param outputKeyClass The class of the output key. 195 * @param outputValueClass The class of the output value. 196 * @param job The current job to adjust. Make sure the passed job is carrying all 197 * necessary HBase configuration. 198 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 199 * the distributed cache (tmpjars). 200 * @param inputFormatClass The class of the input format 201 * @throws IOException When setting up the details fails. 202 */ 203 public static void initTableMapperJob(byte[] table, Scan scan, 204 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 205 Job job, boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) 206 throws IOException { 207 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, 208 addDependencyJars, inputFormatClass); 209 } 210 211 /** 212 * Use this before submitting a TableMap job. It will appropriately set up the job. 213 * @param table Binary representation of the table name to read from. 214 * @param scan The scan instance with the columns, time range etc. 215 * @param mapper The mapper class to use. 216 * @param outputKeyClass The class of the output key. 217 * @param outputValueClass The class of the output value. 218 * @param job The current job to adjust. Make sure the passed job is carrying all 219 * necessary HBase configuration. 220 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 221 * the distributed cache (tmpjars). 222 * @throws IOException When setting up the details fails. 223 */ 224 public static void initTableMapperJob(byte[] table, Scan scan, 225 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 226 Job job, boolean addDependencyJars) throws IOException { 227 initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, 228 addDependencyJars, getConfiguredInputFormat(job)); 229 } 230 231 /** 232 * @return {@link TableInputFormat} .class unless Configuration has something else at 233 * {@link #TABLE_INPUT_CLASS_KEY}. 234 */ 235 private static Class<? extends InputFormat> getConfiguredInputFormat(Job job) { 236 return (Class<? extends InputFormat>) job.getConfiguration().getClass(TABLE_INPUT_CLASS_KEY, 237 TableInputFormat.class); 238 } 239 240 /** 241 * Use this before submitting a TableMap job. It will appropriately set up the job. 242 * @param table The table name to read from. 243 * @param scan The scan instance with the columns, time range etc. 244 * @param mapper The mapper class to use. 245 * @param outputKeyClass The class of the output key. 246 * @param outputValueClass The class of the output value. 247 * @param job The current job to adjust. Make sure the passed job is carrying all 248 * necessary HBase configuration. 249 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 250 * the distributed cache (tmpjars). 251 * @throws IOException When setting up the details fails. 252 */ 253 public static void initTableMapperJob(String table, Scan scan, 254 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 255 Job job, boolean addDependencyJars) throws IOException { 256 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, 257 addDependencyJars, getConfiguredInputFormat(job)); 258 } 259 260 /** 261 * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on direct 262 * memory will likely cause the map tasks to OOM when opening the region. This is done here 263 * instead of in TableSnapshotRegionRecordReader in case an advanced user wants to override this 264 * behavior in their job. 265 */ 266 public static void resetCacheConfig(Configuration conf) { 267 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); 268 conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); 269 conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); 270 } 271 272 /** 273 * Sets up the job for reading from one or more table snapshots, with one or more scans per 274 * snapshot. It bypasses hbase servers and read directly from snapshot files. 275 * @param snapshotScans map of snapshot name to scans on that snapshot. 276 * @param mapper The mapper class to use. 277 * @param outputKeyClass The class of the output key. 278 * @param outputValueClass The class of the output value. 279 * @param job The current job to adjust. Make sure the passed job is carrying all 280 * necessary HBase configuration. 281 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 282 * the distributed cache (tmpjars). 283 */ 284 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, 285 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 286 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 287 MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); 288 289 job.setInputFormatClass(MultiTableSnapshotInputFormat.class); 290 if (outputValueClass != null) { 291 job.setMapOutputValueClass(outputValueClass); 292 } 293 if (outputKeyClass != null) { 294 job.setMapOutputKeyClass(outputKeyClass); 295 } 296 job.setMapperClass(mapper); 297 Configuration conf = job.getConfiguration(); 298 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 299 300 if (addDependencyJars) { 301 addDependencyJars(job); 302 addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class); 303 } 304 305 resetCacheConfig(job.getConfiguration()); 306 } 307 308 /** 309 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 310 * from snapshot files. 311 * @param snapshotName The name of the snapshot (of a table) to read from. 312 * @param scan The scan instance with the columns, time range etc. 313 * @param mapper The mapper class to use. 314 * @param outputKeyClass The class of the output key. 315 * @param outputValueClass The class of the output value. 316 * @param job The current job to adjust. Make sure the passed job is carrying all 317 * necessary HBase configuration. 318 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 319 * the distributed cache (tmpjars). 320 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user 321 * should have write permissions to this directory, and this should not 322 * be a subdirectory of rootdir. After the job is finished, restore 323 * directory can be deleted. 324 * @throws IOException When setting up the details fails. 325 * @see TableSnapshotInputFormat 326 */ 327 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, 328 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 329 Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 330 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); 331 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, 332 addDependencyJars, false, TableSnapshotInputFormat.class); 333 resetCacheConfig(job.getConfiguration()); 334 } 335 336 /** 337 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 338 * from snapshot files. 339 * @param snapshotName The name of the snapshot (of a table) to read from. 340 * @param scan The scan instance with the columns, time range etc. 341 * @param mapper The mapper class to use. 342 * @param outputKeyClass The class of the output key. 343 * @param outputValueClass The class of the output value. 344 * @param job The current job to adjust. Make sure the passed job is carrying all 345 * necessary HBase configuration. 346 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 347 * the distributed cache (tmpjars). 348 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user 349 * should have write permissions to this directory, and this should not 350 * be a subdirectory of rootdir. After the job is finished, restore 351 * directory can be deleted. 352 * @param splitAlgo algorithm to split 353 * @param numSplitsPerRegion how many input splits to generate per one region 354 * @throws IOException When setting up the details fails. 355 * @see TableSnapshotInputFormat 356 */ 357 public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, 358 Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 359 Job job, boolean addDependencyJars, Path tmpRestoreDir, RegionSplitter.SplitAlgorithm splitAlgo, 360 int numSplitsPerRegion) throws IOException { 361 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo, 362 numSplitsPerRegion); 363 initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job, 364 addDependencyJars, false, TableSnapshotInputFormat.class); 365 resetCacheConfig(job.getConfiguration()); 366 } 367 368 /** 369 * Use this before submitting a Multi TableMap job. It will appropriately set up the job. 370 * @param scans The list of {@link Scan} objects to read from. 371 * @param mapper The mapper class to use. 372 * @param outputKeyClass The class of the output key. 373 * @param outputValueClass The class of the output value. 374 * @param job The current job to adjust. Make sure the passed job is carrying all 375 * necessary HBase configuration. 376 * @throws IOException When setting up the details fails. 377 */ 378 public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, 379 Class<?> outputKeyClass, Class<?> outputValueClass, Job job) throws IOException { 380 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, true); 381 } 382 383 /** 384 * Use this before submitting a Multi TableMap job. It will appropriately set up the job. 385 * @param scans The list of {@link Scan} objects to read from. 386 * @param mapper The mapper class to use. 387 * @param outputKeyClass The class of the output key. 388 * @param outputValueClass The class of the output value. 389 * @param job The current job to adjust. Make sure the passed job is carrying all 390 * necessary HBase configuration. 391 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 392 * the distributed cache (tmpjars). 393 * @throws IOException When setting up the details fails. 394 */ 395 public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, 396 Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars) 397 throws IOException { 398 initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, addDependencyJars, 399 true); 400 } 401 402 /** 403 * Use this before submitting a Multi TableMap job. It will appropriately set up the job. 404 * @param scans The list of {@link Scan} objects to read from. 405 * @param mapper The mapper class to use. 406 * @param outputKeyClass The class of the output key. 407 * @param outputValueClass The class of the output value. 408 * @param job The current job to adjust. Make sure the passed job is carrying all 409 * necessary HBase configuration. 410 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 411 * the distributed cache (tmpjars). 412 * @param initCredentials whether to initialize hbase auth credentials for the job 413 * @throws IOException When setting up the details fails. 414 */ 415 public static void initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, 416 Class<?> outputKeyClass, Class<?> outputValueClass, Job job, boolean addDependencyJars, 417 boolean initCredentials) throws IOException { 418 job.setInputFormatClass(MultiTableInputFormat.class); 419 if (outputValueClass != null) { 420 job.setMapOutputValueClass(outputValueClass); 421 } 422 if (outputKeyClass != null) { 423 job.setMapOutputKeyClass(outputKeyClass); 424 } 425 job.setMapperClass(mapper); 426 Configuration conf = job.getConfiguration(); 427 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 428 List<String> scanStrings = new ArrayList<>(); 429 430 for (Scan scan : scans) { 431 scanStrings.add(convertScanToString(scan)); 432 } 433 job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, 434 scanStrings.toArray(new String[scanStrings.size()])); 435 436 if (addDependencyJars) { 437 addDependencyJars(job); 438 } 439 440 if (initCredentials) { 441 initCredentials(job); 442 } 443 } 444 445 public static void initCredentials(Job job) throws IOException { 446 UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); 447 if (userProvider.isHadoopSecurityEnabled()) { 448 // propagate delegation related props from launcher job to MR job 449 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 450 job.getConfiguration().set("mapreduce.job.credentials.binary", 451 System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 452 } 453 } 454 455 if (userProvider.isHBaseSecurityEnabled()) { 456 try { 457 // init credentials for remote cluster 458 String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); 459 User user = userProvider.getCurrent(); 460 if (quorumAddress != null) { 461 Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), 462 quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); 463 Connection peerConn = ConnectionFactory.createConnection(peerConf); 464 try { 465 TokenUtil.addTokenForJob(peerConn, user, job); 466 } finally { 467 peerConn.close(); 468 } 469 } 470 471 Connection conn = ConnectionFactory.createConnection(job.getConfiguration()); 472 try { 473 TokenUtil.addTokenForJob(conn, user, job); 474 } finally { 475 conn.close(); 476 } 477 } catch (InterruptedException ie) { 478 LOG.info("Interrupted obtaining user authentication token"); 479 Thread.currentThread().interrupt(); 480 } 481 } 482 } 483 484 /** 485 * Obtain an authentication token, for the specified cluster, on behalf of the current user and 486 * add it to the credentials for the given map reduce job. The quorumAddress is the key to the ZK 487 * ensemble, which contains: hbase.zookeeper.quorum, hbase.zookeeper.client.port and 488 * zookeeper.znode.parent 489 * @param job The job that requires the permission. 490 * @param quorumAddress string that contains the 3 required configuratins 491 * @throws IOException When the authentication token cannot be obtained. 492 * @deprecated Since 1.2.0 and will be removed in 3.0.0. Use 493 * {@link #initCredentialsForCluster(Job, Configuration)} instead. 494 * @see #initCredentialsForCluster(Job, Configuration) 495 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14886">HBASE-14886</a> 496 */ 497 @Deprecated 498 public static void initCredentialsForCluster(Job job, String quorumAddress) throws IOException { 499 Configuration peerConf = 500 HBaseConfiguration.createClusterConf(job.getConfiguration(), quorumAddress); 501 initCredentialsForCluster(job, peerConf); 502 } 503 504 /** 505 * Obtain an authentication token, for the specified cluster, on behalf of the current user and 506 * add it to the credentials for the given map reduce job. 507 * @param job The job that requires the permission. 508 * @param conf The configuration to use in connecting to the peer cluster 509 * @throws IOException When the authentication token cannot be obtained. 510 */ 511 public static void initCredentialsForCluster(Job job, Configuration conf) throws IOException { 512 UserProvider userProvider = UserProvider.instantiate(conf); 513 if (userProvider.isHBaseSecurityEnabled()) { 514 try { 515 Connection peerConn = ConnectionFactory.createConnection(conf); 516 try { 517 TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); 518 } finally { 519 peerConn.close(); 520 } 521 } catch (InterruptedException e) { 522 LOG.info("Interrupted obtaining user authentication token"); 523 Thread.interrupted(); 524 } 525 } 526 } 527 528 /** 529 * Writes the given scan into a Base64 encoded string. 530 * @param scan The scan to write out. 531 * @return The scan saved in a Base64 encoded string. 532 * @throws IOException When writing the scan fails. 533 */ 534 public static String convertScanToString(Scan scan) throws IOException { 535 ClientProtos.Scan proto = ProtobufUtil.toScan(scan); 536 return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray())); 537 } 538 539 /** 540 * Converts the given Base64 string back into a Scan instance. 541 * @param base64 The scan details. 542 * @return The newly created Scan instance. 543 * @throws IOException When reading the scan instance fails. 544 */ 545 public static Scan convertStringToScan(String base64) throws IOException { 546 byte[] decoded = Base64.getDecoder().decode(base64); 547 return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded)); 548 } 549 550 /** 551 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 552 * @param table The output table. 553 * @param reducer The reducer class to use. 554 * @param job The current job to adjust. 555 * @throws IOException When determining the region count fails. 556 */ 557 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 558 Job job) throws IOException { 559 initTableReducerJob(table, reducer, job, null); 560 } 561 562 /** 563 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 564 * @param table The output table. 565 * @param reducer The reducer class to use. 566 * @param job The current job to adjust. 567 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 568 * @throws IOException When determining the region count fails. 569 */ 570 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 571 Job job, Class partitioner) throws IOException { 572 initTableReducerJob(table, reducer, job, partitioner, null, null, null); 573 } 574 575 /** 576 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 577 * @param table The output table. 578 * @param reducer The reducer class to use. 579 * @param job The current job to adjust. Make sure the passed job is carrying all 580 * necessary HBase configuration. 581 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 582 * @param quorumAddress Distant cluster to write to; default is null for output to the cluster 583 * that is designated in <code>hbase-site.xml</code>. Set this String to the 584 * zookeeper ensemble of an alternate remote cluster when you would have the 585 * reduce write a cluster that is other than the default; e.g. copying tables 586 * between clusters, the source would be designated by 587 * <code>hbase-site.xml</code> and this param would have the ensemble address 588 * of the remote cluster. The format to pass is particular. Pass 589 * <code> <hbase.zookeeper.quorum>:< 590 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 591 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 592 * @param serverClass redefined hbase.regionserver.class 593 * @param serverImpl redefined hbase.regionserver.impl 594 * @throws IOException When determining the region count fails. 595 */ 596 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 597 Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl) 598 throws IOException { 599 initTableReducerJob(table, reducer, job, partitioner, quorumAddress, serverClass, serverImpl, 600 true); 601 } 602 603 /** 604 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 605 * @param table The output table. 606 * @param reducer The reducer class to use. 607 * @param job The current job to adjust. Make sure the passed job is carrying all 608 * necessary HBase configuration. 609 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 610 * @param quorumAddress Distant cluster to write to; default is null for output to the cluster 611 * that is designated in <code>hbase-site.xml</code>. Set this String to 612 * the zookeeper ensemble of an alternate remote cluster when you would 613 * have the reduce write a cluster that is other than the default; e.g. 614 * copying tables between clusters, the source would be designated by 615 * <code>hbase-site.xml</code> and this param would have the ensemble 616 * address of the remote cluster. The format to pass is particular. Pass 617 * <code> <hbase.zookeeper.quorum>:< 618 * hbase.zookeeper.client.port>:<zookeeper.znode.parent> 619 * </code> such as <code>server,server2,server3:2181:/hbase</code>. 620 * @param serverClass redefined hbase.regionserver.class 621 * @param serverImpl redefined hbase.regionserver.impl 622 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 623 * the distributed cache (tmpjars). 624 * @throws IOException When determining the region count fails. 625 */ 626 public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, 627 Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl, 628 boolean addDependencyJars) throws IOException { 629 630 Configuration conf = job.getConfiguration(); 631 HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); 632 job.setOutputFormatClass(TableOutputFormat.class); 633 if (reducer != null) job.setReducerClass(reducer); 634 conf.set(TableOutputFormat.OUTPUT_TABLE, table); 635 conf.setStrings("io.serializations", conf.get("io.serializations"), 636 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 637 // If passed a quorum/ensemble address, pass it on to TableOutputFormat. 638 if (quorumAddress != null) { 639 // Calling this will validate the format 640 ZKConfig.validateClusterKey(quorumAddress); 641 conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); 642 } 643 if (serverClass != null && serverImpl != null) { 644 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); 645 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); 646 } 647 job.setOutputKeyClass(ImmutableBytesWritable.class); 648 job.setOutputValueClass(Writable.class); 649 if (partitioner == HRegionPartitioner.class) { 650 job.setPartitionerClass(HRegionPartitioner.class); 651 int regions = getRegionCount(conf, TableName.valueOf(table)); 652 if (job.getNumReduceTasks() > regions) { 653 job.setNumReduceTasks(regions); 654 } 655 } else if (partitioner != null) { 656 job.setPartitionerClass(partitioner); 657 } 658 659 if (addDependencyJars) { 660 addDependencyJars(job); 661 } 662 663 initCredentials(job); 664 } 665 666 /** 667 * Ensures that the given number of reduce tasks for the given job configuration does not exceed 668 * the number of regions for the given table. 669 * @param table The table to get the region count for. 670 * @param job The current job to adjust. 671 * @throws IOException When retrieving the table details fails. 672 */ 673 public static void limitNumReduceTasks(String table, Job job) throws IOException { 674 int regions = getRegionCount(job.getConfiguration(), TableName.valueOf(table)); 675 if (job.getNumReduceTasks() > regions) { 676 job.setNumReduceTasks(regions); 677 } 678 } 679 680 /** 681 * Sets the number of reduce tasks for the given job configuration to the number of regions the 682 * given table has. 683 * @param table The table to get the region count for. 684 * @param job The current job to adjust. 685 * @throws IOException When retrieving the table details fails. 686 */ 687 public static void setNumReduceTasks(String table, Job job) throws IOException { 688 job.setNumReduceTasks(getRegionCount(job.getConfiguration(), TableName.valueOf(table))); 689 } 690 691 /** 692 * Sets the number of rows to return and cache with each scanner iteration. Higher caching values 693 * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached 694 * rows. 695 * @param job The current job to adjust. 696 * @param batchSize The number of rows to return in batch with each scanner iteration. 697 */ 698 public static void setScannerCaching(Job job, int batchSize) { 699 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); 700 } 701 702 /** 703 * Add HBase and its dependencies (only) to the job configuration. 704 * <p> 705 * This is intended as a low-level API, facilitating code reuse between this class and its mapred 706 * counterpart. It also of use to external tools that need to build a MapReduce job that interacts 707 * with HBase but want fine-grained control over the jars shipped to the cluster. 708 * </p> 709 * @param conf The Configuration object to extend with dependencies. 710 * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil 711 * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a> 712 */ 713 public static void addHBaseDependencyJars(Configuration conf) throws IOException { 714 addDependencyJarsForClasses(conf, 715 // explicitly pull a class from each module 716 org.apache.hadoop.hbase.HConstants.class, // hbase-common 717 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol 718 org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded 719 org.apache.hadoop.hbase.client.Put.class, // hbase-client 720 org.apache.hadoop.hbase.ipc.RpcServer.class, // hbase-server 721 org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat 722 org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat 723 org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-mapreduce 724 org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics 725 org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api 726 org.apache.hadoop.hbase.replication.ReplicationUtils.class, // hbase-replication 727 org.apache.hadoop.hbase.http.HttpServer.class, // hbase-http 728 org.apache.hadoop.hbase.procedure2.Procedure.class, // hbase-procedure 729 org.apache.hadoop.hbase.zookeeper.ZKWatcher.class, // hbase-zookeeper 730 org.apache.hbase.thirdparty.com.google.common.collect.Lists.class, // hb-shaded-miscellaneous 731 org.apache.hbase.thirdparty.com.google.gson.GsonBuilder.class, // hbase-shaded-gson 732 org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class, // hb-sh-protobuf 733 org.apache.hbase.thirdparty.io.netty.channel.Channel.class, // hbase-shaded-netty 734 org.apache.hadoop.hbase.unsafe.HBasePlatformDependent.class, // hbase-unsafe 735 org.apache.zookeeper.ZooKeeper.class, // zookeeper 736 com.google.protobuf.Message.class, // protobuf 737 com.codahale.metrics.MetricRegistry.class, // metrics-core 738 org.apache.commons.lang3.ArrayUtils.class, // commons-lang 739 io.opentelemetry.api.trace.Span.class, // opentelemetry-api 740 io.opentelemetry.semconv.trace.attributes.SemanticAttributes.class, // opentelemetry-semconv 741 io.opentelemetry.context.Context.class); // opentelemetry-context 742 } 743 744 /** 745 * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. Also 746 * exposed to shell scripts via `bin/hbase mapredcp`. 747 */ 748 public static String buildDependencyClasspath(Configuration conf) { 749 if (conf == null) { 750 throw new IllegalArgumentException("Must provide a configuration object."); 751 } 752 Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars")); 753 if (paths.isEmpty()) { 754 throw new IllegalArgumentException("Configuration contains no tmpjars."); 755 } 756 StringBuilder sb = new StringBuilder(); 757 for (String s : paths) { 758 // entries can take the form 'file:/path/to/file.jar'. 759 int idx = s.indexOf(":"); 760 if (idx != -1) s = s.substring(idx + 1); 761 if (sb.length() > 0) sb.append(File.pathSeparator); 762 sb.append(s); 763 } 764 return sb.toString(); 765 } 766 767 /** 768 * Add the HBase dependency jars as well as jars for any of the configured job classes to the job 769 * configuration, so that JobClient will ship them to the cluster and add them to the 770 * DistributedCache. 771 */ 772 public static void addDependencyJars(Job job) throws IOException { 773 addHBaseDependencyJars(job.getConfiguration()); 774 try { 775 addDependencyJarsForClasses(job.getConfiguration(), 776 // when making changes here, consider also mapred.TableMapReduceUtil 777 // pull job classes 778 job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getInputFormatClass(), 779 job.getOutputKeyClass(), job.getOutputValueClass(), job.getOutputFormatClass(), 780 job.getPartitionerClass(), job.getCombinerClass()); 781 } catch (ClassNotFoundException e) { 782 throw new IOException(e); 783 } 784 } 785 786 /** 787 * Add the jars containing the given classes to the job's configuration such that JobClient will 788 * ship them to the cluster and add them to the DistributedCache. 789 * @deprecated since 1.3.0 and will be removed in 3.0.0. Use {@link #addDependencyJars(Job)} 790 * instead. 791 * @see #addDependencyJars(Job) 792 * @see <a href="https://issues.apache.org/jira/browse/HBASE-8386">HBASE-8386</a> 793 */ 794 @Deprecated 795 public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException { 796 LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it" 797 + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " 798 + "instead. See HBASE-8386 for more details."); 799 addDependencyJarsForClasses(conf, classes); 800 } 801 802 /** 803 * Add the jars containing the given classes to the job's configuration such that JobClient will 804 * ship them to the cluster and add them to the DistributedCache. N.B. that this method at most 805 * adds one jar per class given. If there is more than one jar available containing a class with 806 * the same name as a given class, we don't define which of those jars might be chosen. 807 * @param conf The Hadoop Configuration to modify 808 * @param classes will add just those dependencies needed to find the given classes 809 * @throws IOException if an underlying library call fails. 810 */ 811 @InterfaceAudience.Private 812 public static void addDependencyJarsForClasses(Configuration conf, Class<?>... classes) 813 throws IOException { 814 815 FileSystem localFs = FileSystem.getLocal(conf); 816 Set<String> jars = new HashSet<>(); 817 // Add jars that are already in the tmpjars variable 818 jars.addAll(conf.getStringCollection("tmpjars")); 819 820 // add jars as we find them to a map of contents jar name so that we can avoid 821 // creating new jars for classes that have already been packaged. 822 Map<String, String> packagedClasses = new HashMap<>(); 823 824 // Add jars containing the specified classes 825 for (Class<?> clazz : classes) { 826 if (clazz == null) continue; 827 828 Path path = findOrCreateJar(clazz, localFs, packagedClasses); 829 if (path == null) { 830 LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster."); 831 continue; 832 } 833 if (!localFs.exists(path)) { 834 LOG.warn("Could not validate jar file " + path + " for class " + clazz); 835 continue; 836 } 837 jars.add(path.toString()); 838 } 839 if (jars.isEmpty()) return; 840 841 conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); 842 } 843 844 /** 845 * Finds the Jar for a class or creates it if it doesn't exist. If the class is in a directory in 846 * the classpath, it creates a Jar on the fly with the contents of the directory and returns the 847 * path to that Jar. If a Jar is created, it is created in the system temporary directory. 848 * Otherwise, returns an existing jar that contains a class of the same name. Maintains a mapping 849 * from jar contents to the tmp jar created. 850 * @param my_class the class to find. 851 * @param fs the FileSystem with which to qualify the returned path. 852 * @param packagedClasses a map of class name to path. 853 * @return a jar file that contains the class. 854 */ 855 private static Path findOrCreateJar(Class<?> my_class, FileSystem fs, 856 Map<String, String> packagedClasses) throws IOException { 857 // attempt to locate an existing jar for the class. 858 String jar = findContainingJar(my_class, packagedClasses); 859 if (null == jar || jar.isEmpty()) { 860 jar = getJar(my_class); 861 updateMap(jar, packagedClasses); 862 } 863 864 if (null == jar || jar.isEmpty()) { 865 return null; 866 } 867 868 LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); 869 return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory()); 870 } 871 872 /** 873 * Add entries to <code>packagedClasses</code> corresponding to class files contained in 874 * <code>jar</code>. 875 * @param jar The jar who's content to list. 876 * @param packagedClasses map[class -> jar] 877 */ 878 private static void updateMap(String jar, Map<String, String> packagedClasses) 879 throws IOException { 880 if (null == jar || jar.isEmpty()) { 881 return; 882 } 883 ZipFile zip = null; 884 try { 885 zip = new ZipFile(jar); 886 for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) { 887 ZipEntry entry = iter.nextElement(); 888 if (entry.getName().endsWith("class")) { 889 packagedClasses.put(entry.getName(), jar); 890 } 891 } 892 } finally { 893 if (null != zip) zip.close(); 894 } 895 } 896 897 /** 898 * Find a jar that contains a class of the same name, if any. It will return a jar file, even if 899 * that is not the first thing on the class path that has a class with the same name. Looks first 900 * on the classpath and then in the <code>packagedClasses</code> map. 901 * @param my_class the class to find. 902 * @return a jar file that contains the class, or null. 903 */ 904 private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses) 905 throws IOException { 906 ClassLoader loader = my_class.getClassLoader(); 907 908 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; 909 910 if (loader != null) { 911 // first search the classpath 912 for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) { 913 URL url = itr.nextElement(); 914 if ("jar".equals(url.getProtocol())) { 915 String toReturn = url.getPath(); 916 if (toReturn.startsWith("file:")) { 917 toReturn = toReturn.substring("file:".length()); 918 } 919 // URLDecoder is a misnamed class, since it actually decodes 920 // x-www-form-urlencoded MIME type rather than actual 921 // URL encoding (which the file path has). Therefore it would 922 // decode +s to ' 's which is incorrect (spaces are actually 923 // either unencoded or encoded as "%20"). Replace +s first, so 924 // that they are kept sacred during the decoding process. 925 toReturn = toReturn.replaceAll("\\+", "%2B"); 926 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 927 return toReturn.replaceAll("!.*$", ""); 928 } 929 } 930 } 931 932 // now look in any jars we've packaged using JarFinder. Returns null when 933 // no jar is found. 934 return packagedClasses.get(class_file); 935 } 936 937 /** 938 * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job configuration 939 * contexts (HBASE-8140) and also for testing on MRv2. check if we have HADOOP-9426. 940 * @param my_class the class to find. 941 * @return a jar file that contains the class, or null. 942 */ 943 private static String getJar(Class<?> my_class) { 944 String ret = null; 945 try { 946 ret = JarFinder.getJar(my_class); 947 } catch (Exception e) { 948 // toss all other exceptions, related to reflection failure 949 throw new RuntimeException("getJar invocation failed.", e); 950 } 951 952 return ret; 953 } 954 955 private static int getRegionCount(Configuration conf, TableName tableName) throws IOException { 956 try (Connection conn = ConnectionFactory.createConnection(conf); 957 RegionLocator locator = conn.getRegionLocator(tableName)) { 958 return locator.getAllRegionLocations().size(); 959 } 960 } 961}