001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.conf.Configured;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.ConnectionFactory;
031import org.apache.hadoop.hbase.client.RegionLocator;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
035import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
036import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
037import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
040import org.apache.hadoop.io.NullWritable;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.mapreduce.Mapper;
043import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
044import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
045import org.apache.hadoop.util.Tool;
046import org.apache.hadoop.util.ToolRunner;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
053 * for later bulk importing.
054 */
055@InterfaceAudience.Private
056public class MapReduceHFileSplitterJob extends Configured implements Tool {
057  private static final Logger LOG = LoggerFactory.getLogger(MapReduceHFileSplitterJob.class);
058  final static String NAME = "HFileSplitterJob";
059  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
060  public final static String TABLES_KEY = "hfile.input.tables";
061  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
062  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
063
064  public MapReduceHFileSplitterJob() {
065  }
066
067  protected MapReduceHFileSplitterJob(final Configuration c) {
068    super(c);
069  }
070
071  /**
072   * A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
073   */
074  static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
075
076    @Override
077    public void map(NullWritable key, Cell value, Context context)
078      throws IOException, InterruptedException {
079      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
080        new MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(value)));
081    }
082
083    @Override
084    public void setup(Context context) throws IOException {
085      // do nothing
086    }
087  }
088
089  /**
090   * Sets up the actual job.
091   * @param args The command line parameters.
092   * @return The newly created job.
093   * @throws IOException When setting up the job fails.
094   */
095  public Job createSubmittableJob(String[] args) throws IOException {
096    Configuration conf = getConf();
097    String inputDirs = args[0];
098    String tabName = args[1];
099    conf.setStrings(TABLES_KEY, tabName);
100    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
101    Job job = Job.getInstance(conf,
102      conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
103    // MapReduceHFileSplitter needs ExtendedCellSerialization so that sequenceId can be propagated
104    // when sorting cells in CellSortReducer
105    job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
106      true);
107    job.setJarByClass(MapReduceHFileSplitterJob.class);
108    job.setInputFormatClass(HFileInputFormat.class);
109    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
110    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
111    if (hfileOutPath != null) {
112      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
113      TableName tableName = TableName.valueOf(tabName);
114      job.setMapperClass(HFileCellMapper.class);
115      job.setReducerClass(CellSortReducer.class);
116      Path outputDir = new Path(hfileOutPath);
117      FileOutputFormat.setOutputPath(job, outputDir);
118      job.setMapOutputValueClass(MapReduceExtendedCell.class);
119      try (Connection conn = ConnectionFactory.createConnection(conf);
120        Table table = conn.getTable(tableName);
121        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
122        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
123      }
124      LOG.debug("success configuring load incremental job");
125
126      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
127        org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
128    } else {
129      throw new IOException("No bulk output directory specified");
130    }
131    return job;
132  }
133
134  /**
135   * Print usage
136   * @param errorMsg Error message. Can be null.
137   */
138  private void usage(final String errorMsg) {
139    if (errorMsg != null && errorMsg.length() > 0) {
140      System.err.println("ERROR: " + errorMsg);
141    }
142    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
143    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
144    System.err.println("<table>  table to load.\n");
145    System.err.println("To generate HFiles for a bulk data load, pass the option:");
146    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
147    System.err.println("Other options:");
148    System.err.println("   -D " + JOB_NAME_CONF_KEY
149      + "=jobName - use the specified mapreduce job name for the HFile splitter");
150    System.err.println("For performance also consider the following options:\n"
151      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
152  }
153
154  /**
155   * Main entry point.
156   * @param args The command line parameters.
157   * @throws Exception When running the job fails.
158   */
159  public static void main(String[] args) throws Exception {
160    int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
161    System.exit(ret);
162  }
163
164  @Override
165  public int run(String[] args) throws Exception {
166    if (args.length < 2) {
167      usage("Wrong number of arguments: " + args.length);
168      return -1;
169    }
170    Job job = createSubmittableJob(args);
171    int result = job.waitForCompletion(true) ? 0 : 1;
172    return result;
173  }
174}