001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.RegionLocator; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 033import org.apache.hadoop.hbase.mapreduce.MutationSerialization; 034import org.apache.hadoop.hbase.mapreduce.ResultSerialization; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.security.UserProvider; 037import org.apache.hadoop.hbase.security.token.TokenUtil; 038import org.apache.hadoop.hbase.util.RegionSplitter; 039import org.apache.hadoop.mapred.FileInputFormat; 040import org.apache.hadoop.mapred.InputFormat; 041import org.apache.hadoop.mapred.JobConf; 042import org.apache.hadoop.mapred.OutputFormat; 043import org.apache.hadoop.mapred.TextInputFormat; 044import org.apache.hadoop.mapred.TextOutputFormat; 045import org.apache.yetus.audience.InterfaceAudience; 046 047/** 048 * Utility for {@link TableMap} and {@link TableReduce} 049 */ 050@InterfaceAudience.Public 051@SuppressWarnings({ "rawtypes", "unchecked" }) 052public class TableMapReduceUtil { 053 054 /** 055 * Use this before submitting a TableMap job. It will appropriately set up the JobConf. 056 * @param table The table name to read from. 057 * @param columns The columns to scan. 058 * @param mapper The mapper class to use. 059 * @param outputKeyClass The class of the output key. 060 * @param outputValueClass The class of the output value. 061 * @param job The current job configuration to adjust. 062 */ 063 public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, 064 Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job) { 065 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true, 066 TableInputFormat.class); 067 } 068 069 public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, 070 Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars) { 071 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, 072 addDependencyJars, TableInputFormat.class); 073 } 074 075 /** 076 * Use this before submitting a TableMap job. It will appropriately set up the JobConf. 077 * @param table The table name to read from. 078 * @param columns The columns to scan. 079 * @param mapper The mapper class to use. 080 * @param outputKeyClass The class of the output key. 081 * @param outputValueClass The class of the output value. 082 * @param job The current job configuration to adjust. 083 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 084 * the distributed cache (tmpjars). 085 */ 086 public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, 087 Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars, 088 Class<? extends InputFormat> inputFormat) { 089 090 job.setInputFormat(inputFormat); 091 job.setMapOutputValueClass(outputValueClass); 092 job.setMapOutputKeyClass(outputKeyClass); 093 job.setMapperClass(mapper); 094 job.setStrings("io.serializations", job.get("io.serializations"), 095 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 096 FileInputFormat.addInputPaths(job, table); 097 job.set(TableInputFormat.COLUMN_LIST, columns); 098 if (addDependencyJars) { 099 try { 100 addDependencyJars(job); 101 } catch (IOException e) { 102 e.printStackTrace(); 103 } 104 } 105 try { 106 initCredentials(job); 107 } catch (IOException ioe) { 108 // just spit out the stack trace? really? 109 ioe.printStackTrace(); 110 } 111 } 112 113 /** 114 * Sets up the job for reading from one or more multiple table snapshots, with one or more scans 115 * per snapshot. It bypasses hbase servers and read directly from snapshot files. 116 * @param snapshotScans map of snapshot name to scans on that snapshot. 117 * @param mapper The mapper class to use. 118 * @param outputKeyClass The class of the output key. 119 * @param outputValueClass The class of the output value. 120 * @param job The current job to adjust. Make sure the passed job is carrying all 121 * necessary HBase configuration. 122 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 123 * the distributed cache (tmpjars). 124 */ 125 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, 126 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 127 JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 128 MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); 129 130 job.setInputFormat(MultiTableSnapshotInputFormat.class); 131 if (outputValueClass != null) { 132 job.setMapOutputValueClass(outputValueClass); 133 } 134 if (outputKeyClass != null) { 135 job.setMapOutputKeyClass(outputKeyClass); 136 } 137 job.setMapperClass(mapper); 138 if (addDependencyJars) { 139 addDependencyJars(job); 140 } 141 142 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); 143 } 144 145 /** 146 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 147 * from snapshot files. 148 * @param snapshotName The name of the snapshot (of a table) to read from. 149 * @param columns The columns to scan. 150 * @param mapper The mapper class to use. 151 * @param outputKeyClass The class of the output key. 152 * @param outputValueClass The class of the output value. 153 * @param job The current job to adjust. Make sure the passed job is carrying all 154 * necessary HBase configuration. 155 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 156 * the distributed cache (tmpjars). 157 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user 158 * should have write permissions to this directory, and this should not 159 * be a subdirectory of rootdir. After the job is finished, restore 160 * directory can be deleted. 161 * @throws IOException When setting up the details fails. 162 * @see TableSnapshotInputFormat 163 */ 164 public static void initTableSnapshotMapJob(String snapshotName, String columns, 165 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 166 JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { 167 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); 168 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job, 169 addDependencyJars, TableSnapshotInputFormat.class); 170 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); 171 } 172 173 /** 174 * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly 175 * from snapshot files. 176 * @param snapshotName The name of the snapshot (of a table) to read from. 177 * @param columns The columns to scan. 178 * @param mapper The mapper class to use. 179 * @param outputKeyClass The class of the output key. 180 * @param outputValueClass The class of the output value. 181 * @param jobConf The current job to adjust. Make sure the passed job is carrying all 182 * necessary HBase configuration. 183 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 184 * the distributed cache (tmpjars). 185 * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user 186 * should have write permissions to this directory, and this should not 187 * be a subdirectory of rootdir. After the job is finished, restore 188 * directory can be deleted. 189 * @param splitAlgo algorithm to split 190 * @param numSplitsPerRegion how many input splits to generate per one region 191 * @throws IOException When setting up the details fails. 192 * @see TableSnapshotInputFormat 193 */ 194 public static void initTableSnapshotMapJob(String snapshotName, String columns, 195 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, 196 JobConf jobConf, boolean addDependencyJars, Path tmpRestoreDir, 197 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 198 TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo, 199 numSplitsPerRegion); 200 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf, 201 addDependencyJars, TableSnapshotInputFormat.class); 202 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf); 203 } 204 205 /** 206 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 207 * @param table The output table. 208 * @param reducer The reducer class to use. 209 * @param job The current job configuration to adjust. 210 * @throws IOException When determining the region count fails. 211 */ 212 public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer, 213 JobConf job) throws IOException { 214 initTableReduceJob(table, reducer, job, null); 215 } 216 217 /** 218 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 219 * @param table The output table. 220 * @param reducer The reducer class to use. 221 * @param job The current job configuration to adjust. 222 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 223 * @throws IOException When determining the region count fails. 224 */ 225 public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer, 226 JobConf job, Class partitioner) throws IOException { 227 initTableReduceJob(table, reducer, job, partitioner, true); 228 } 229 230 /** 231 * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. 232 * @param table The output table. 233 * @param reducer The reducer class to use. 234 * @param job The current job configuration to adjust. 235 * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner. 236 * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via 237 * the distributed cache (tmpjars). 238 * @throws IOException When determining the region count fails. 239 */ 240 public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer, 241 JobConf job, Class partitioner, boolean addDependencyJars) throws IOException { 242 job.setOutputFormat(TableOutputFormat.class); 243 job.setReducerClass(reducer); 244 job.set(TableOutputFormat.OUTPUT_TABLE, table); 245 job.setOutputKeyClass(ImmutableBytesWritable.class); 246 job.setOutputValueClass(Put.class); 247 job.setStrings("io.serializations", job.get("io.serializations"), 248 MutationSerialization.class.getName(), ResultSerialization.class.getName()); 249 if (partitioner == HRegionPartitioner.class) { 250 job.setPartitionerClass(HRegionPartitioner.class); 251 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 252 if (job.getNumReduceTasks() > regions) { 253 job.setNumReduceTasks(regions); 254 } 255 } else if (partitioner != null) { 256 job.setPartitionerClass(partitioner); 257 } 258 if (addDependencyJars) { 259 addDependencyJars(job); 260 } 261 initCredentials(job); 262 } 263 264 public static void initCredentials(JobConf job) throws IOException { 265 UserProvider userProvider = UserProvider.instantiate(job); 266 if (userProvider.isHadoopSecurityEnabled()) { 267 // propagate delegation related props from launcher job to MR job 268 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 269 job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 270 } 271 } 272 273 if (userProvider.isHBaseSecurityEnabled()) { 274 Connection conn = ConnectionFactory.createConnection(job); 275 try { 276 // login the server principal (if using secure Hadoop) 277 User user = userProvider.getCurrent(); 278 TokenUtil.addTokenForJob(conn, job, user); 279 } catch (InterruptedException ie) { 280 ie.printStackTrace(); 281 Thread.currentThread().interrupt(); 282 } finally { 283 conn.close(); 284 } 285 } 286 } 287 288 /** 289 * Ensures that the given number of reduce tasks for the given job configuration does not exceed 290 * the number of regions for the given table. 291 * @param table The table to get the region count for. 292 * @param job The current job configuration to adjust. 293 * @throws IOException When retrieving the table details fails. 294 */ 295 // Used by tests. 296 public static void limitNumReduceTasks(String table, JobConf job) throws IOException { 297 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 298 if (job.getNumReduceTasks() > regions) { 299 job.setNumReduceTasks(regions); 300 } 301 } 302 303 /** 304 * Ensures that the given number of map tasks for the given job configuration does not exceed the 305 * number of regions for the given table. 306 * @param table The table to get the region count for. 307 * @param job The current job configuration to adjust. 308 * @throws IOException When retrieving the table details fails. 309 */ 310 // Used by tests. 311 public static void limitNumMapTasks(String table, JobConf job) throws IOException { 312 int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); 313 if (job.getNumMapTasks() > regions) { 314 job.setNumMapTasks(regions); 315 } 316 } 317 318 /** 319 * Sets the number of reduce tasks for the given job configuration to the number of regions the 320 * given table has. 321 * @param table The table to get the region count for. 322 * @param job The current job configuration to adjust. 323 * @throws IOException When retrieving the table details fails. 324 */ 325 public static void setNumReduceTasks(String table, JobConf job) throws IOException { 326 job.setNumReduceTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table))); 327 } 328 329 /** 330 * Sets the number of map tasks for the given job configuration to the number of regions the given 331 * table has. 332 * @param table The table to get the region count for. 333 * @param job The current job configuration to adjust. 334 * @throws IOException When retrieving the table details fails. 335 */ 336 public static void setNumMapTasks(String table, JobConf job) throws IOException { 337 job.setNumMapTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table))); 338 } 339 340 /** 341 * Sets the number of rows to return and cache with each scanner iteration. Higher caching values 342 * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached 343 * rows. 344 * @param job The current job configuration to adjust. 345 * @param batchSize The number of rows to return in batch with each scanner iteration. 346 */ 347 public static void setScannerCaching(JobConf job, int batchSize) { 348 job.setInt("hbase.client.scanner.caching", batchSize); 349 } 350 351 /** 352 * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) 353 */ 354 public static void addDependencyJars(JobConf job) throws IOException { 355 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); 356 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(job, 357 job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getOutputKeyClass(), 358 job.getOutputValueClass(), job.getPartitionerClass(), 359 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), 360 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), 361 job.getCombinerClass()); 362 } 363 364 private static int getRegionCount(Configuration conf, TableName tableName) throws IOException { 365 try (Connection conn = ConnectionFactory.createConnection(conf); 366 RegionLocator locator = conn.getRegionLocator(tableName)) { 367 return locator.getAllRegionLocations().size(); 368 } 369 } 370}