001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.ByteArrayInputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.lang.reflect.Method;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import java.util.Locale;
031import java.util.Map;
032import java.util.TreeMap;
033import java.util.UUID;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.ExtendedCell;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.KeyValueUtil;
045import org.apache.hadoop.hbase.PrivateCellUtil;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.Tag;
048import org.apache.hadoop.hbase.ZooKeeperConnectionException;
049import org.apache.hadoop.hbase.client.Admin;
050import org.apache.hadoop.hbase.client.ClientInternalHelper;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.client.Delete;
054import org.apache.hadoop.hbase.client.Durability;
055import org.apache.hadoop.hbase.client.Mutation;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.filter.Filter;
061import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
064import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
065import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
066import org.apache.hadoop.io.RawComparator;
067import org.apache.hadoop.io.WritableComparable;
068import org.apache.hadoop.io.WritableComparator;
069import org.apache.hadoop.mapreduce.Job;
070import org.apache.hadoop.mapreduce.Partitioner;
071import org.apache.hadoop.mapreduce.Reducer;
072import org.apache.hadoop.mapreduce.TaskCounter;
073import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
074import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
075import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
076import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
077import org.apache.hadoop.util.Tool;
078import org.apache.hadoop.util.ToolRunner;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.apache.zookeeper.KeeperException;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084/**
085 * Import data written by {@link Export}.
086 */
087@InterfaceAudience.Public
088public class Import extends Configured implements Tool {
089  private static final Logger LOG = LoggerFactory.getLogger(Import.class);
090  final static String NAME = "import";
091  public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
092  public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
093  public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
094  public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
095  public final static String TABLE_NAME = "import.table.name";
096  public final static String WAL_DURABILITY = "import.wal.durability";
097  public final static String HAS_LARGE_RESULT = "import.bulk.hasLargeResult";
098
099  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
100
101  public static class CellWritableComparablePartitioner
102    extends Partitioner<CellWritableComparable, Cell> {
103    private static CellWritableComparable[] START_KEYS = null;
104
105    @Override
106    public int getPartition(CellWritableComparable key, Cell value, int numPartitions) {
107      for (int i = 0; i < START_KEYS.length; ++i) {
108        if (key.compareTo(START_KEYS[i]) <= 0) {
109          return i;
110        }
111      }
112      return START_KEYS.length;
113    }
114
115  }
116
117  public static class CellWritableComparable implements WritableComparable<CellWritableComparable> {
118
119    private ExtendedCell kv = null;
120
121    static {
122      // register this comparator
123      WritableComparator.define(CellWritableComparable.class, new CellWritableComparator());
124    }
125
126    public CellWritableComparable() {
127    }
128
129    public CellWritableComparable(Cell kv) {
130      this.kv = (ExtendedCell) kv;
131    }
132
133    @Override
134    public void write(DataOutput out) throws IOException {
135      out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv));
136      out.writeInt(0);
137      PrivateCellUtil.writeFlatKey(kv, out);
138    }
139
140    @Override
141    public void readFields(DataInput in) throws IOException {
142      kv = KeyValue.create(in);
143    }
144
145    @Override
146    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
147        justification = "This is wrong, yes, but we should be purging Writables, not fixing them")
148    public int compareTo(CellWritableComparable o) {
149      return CellComparator.getInstance().compare(this.kv, o.kv);
150    }
151
152    public static class CellWritableComparator extends WritableComparator {
153
154      @Override
155      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
156        try {
157          CellWritableComparable kv1 = new CellWritableComparable();
158          kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
159          CellWritableComparable kv2 = new CellWritableComparable();
160          kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
161          return compare(kv1, kv2);
162        } catch (IOException e) {
163          throw new RuntimeException(e);
164        }
165      }
166
167    }
168
169  }
170
171  public static class CellReducer
172    extends Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> {
173    protected void reduce(CellWritableComparable row, Iterable<Cell> kvs,
174      Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell>.Context context)
175      throws java.io.IOException, InterruptedException {
176      int index = 0;
177      for (Cell kv : kvs) {
178        context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)),
179          new MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(kv)));
180        if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, "
181          + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
182      }
183    }
184  }
185
186  public static class CellSortImporter extends TableMapper<CellWritableComparable, Cell> {
187    private Map<byte[], byte[]> cfRenameMap;
188    private Filter filter;
189    private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class);
190
191    /**
192     * @param row     The current table row key.
193     * @param value   The columns.
194     * @param context The current context.
195     * @throws IOException When something is broken with the data.
196     */
197    @Override
198    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
199      try {
200        if (LOG.isTraceEnabled()) {
201          LOG.trace(
202            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
203        }
204        if (
205          filter == null || !filter.filterRowKey(
206            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
207        ) {
208          for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(value)) {
209            kv = filterKv(filter, kv);
210            // skip if we filtered it out
211            if (kv == null) {
212              continue;
213            }
214            Cell ret = convertKv(kv, cfRenameMap);
215            context.write(new CellWritableComparable(ret), ret);
216          }
217        }
218      } catch (InterruptedException e) {
219        LOG.error("Interrupted while emitting Cell", e);
220        Thread.currentThread().interrupt();
221      }
222    }
223
224    @Override
225    public void setup(Context context) throws IOException {
226      cfRenameMap = createCfRenameMap(context.getConfiguration());
227      filter = instantiateFilter(context.getConfiguration());
228      int reduceNum = context.getNumReduceTasks();
229      Configuration conf = context.getConfiguration();
230      TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
231      try (Connection conn = ConnectionFactory.createConnection(conf);
232        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
233        byte[][] startKeys = regionLocator.getStartKeys();
234        if (startKeys.length != reduceNum) {
235          throw new IOException("Region split after job initialization");
236        }
237        CellWritableComparable[] startKeyWraps = new CellWritableComparable[startKeys.length - 1];
238        for (int i = 1; i < startKeys.length; ++i) {
239          startKeyWraps[i - 1] =
240            new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
241        }
242        CellWritableComparablePartitioner.START_KEYS = startKeyWraps;
243      }
244    }
245  }
246
247  /**
248   * A mapper that just writes out KeyValues.
249   */
250  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
251      justification = "Writables are going away and this has been this way forever")
252  public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> {
253    private Map<byte[], byte[]> cfRenameMap;
254    private Filter filter;
255    private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class);
256
257    /**
258     * @param row     The current table row key.
259     * @param value   The columns.
260     * @param context The current context.
261     * @throws IOException When something is broken with the data.
262     */
263    @Override
264    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
265      try {
266        if (LOG.isTraceEnabled()) {
267          LOG.trace(
268            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
269        }
270        if (
271          filter == null || !filter.filterRowKey(
272            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
273        ) {
274          for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(value)) {
275            kv = filterKv(filter, kv);
276            // skip if we filtered it out
277            if (kv == null) {
278              continue;
279            }
280            context.write(row, new MapReduceExtendedCell(convertKv(kv, cfRenameMap)));
281          }
282        }
283      } catch (InterruptedException e) {
284        LOG.error("Interrupted while emitting Cell", e);
285        Thread.currentThread().interrupt();
286      }
287    }
288
289    @Override
290    public void setup(Context context) {
291      cfRenameMap = createCfRenameMap(context.getConfiguration());
292      filter = instantiateFilter(context.getConfiguration());
293    }
294  }
295
296  /**
297   * Write table content out to files in hdfs.
298   */
299  public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
300    private Map<byte[], byte[]> cfRenameMap;
301    private List<UUID> clusterIds;
302    private Filter filter;
303    private Durability durability;
304
305    /**
306     * @param row     The current table row key.
307     * @param value   The columns.
308     * @param context The current context.
309     * @throws IOException When something is broken with the data.
310     */
311    @Override
312    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
313      try {
314        writeResult(row, value, context);
315      } catch (InterruptedException e) {
316        LOG.error("Interrupted while writing result", e);
317        Thread.currentThread().interrupt();
318      }
319    }
320
321    private void writeResult(ImmutableBytesWritable key, Result result, Context context)
322      throws IOException, InterruptedException {
323      Put put = null;
324      Delete delete = null;
325      if (LOG.isTraceEnabled()) {
326        LOG.trace(
327          "Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
328      }
329      if (
330        filter == null || !filter.filterRowKey(
331          PrivateCellUtil.createFirstOnRow(key.get(), key.getOffset(), (short) key.getLength()))
332      ) {
333        processKV(key, result, context, put, delete);
334      }
335    }
336
337    protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
338      Delete delete) throws IOException, InterruptedException {
339      for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(result)) {
340        kv = filterKv(filter, kv);
341        // skip if we filter it out
342        if (kv == null) {
343          continue;
344        }
345
346        kv = convertKv(kv, cfRenameMap);
347        // Deletes and Puts are gathered and written when finished
348        /*
349         * If there are sequence of mutations and tombstones in an Export, and after Import the same
350         * sequence should be restored as it is. If we combine all Delete tombstones into single
351         * request then there is chance of ignoring few DeleteFamily tombstones, because if we
352         * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
353         * only newest in hbase table and ignoring other. Check - HBASE-12065
354         */
355        if (PrivateCellUtil.isDeleteFamily(kv)) {
356          Delete deleteFamily = new Delete(key.get());
357          deleteFamily.add(kv);
358          if (durability != null) {
359            deleteFamily.setDurability(durability);
360          }
361          deleteFamily.setClusterIds(clusterIds);
362          context.write(key, deleteFamily);
363        } else if (CellUtil.isDelete(kv)) {
364          if (delete == null) {
365            delete = new Delete(key.get());
366          }
367          delete.add(kv);
368        } else {
369          if (put == null) {
370            put = new Put(key.get());
371          }
372          addPutToKv(put, kv);
373        }
374      }
375      if (put != null) {
376        if (durability != null) {
377          put.setDurability(durability);
378        }
379        put.setClusterIds(clusterIds);
380        context.write(key, put);
381      }
382      if (delete != null) {
383        if (durability != null) {
384          delete.setDurability(durability);
385        }
386        delete.setClusterIds(clusterIds);
387        context.write(key, delete);
388      }
389    }
390
391    protected void addPutToKv(Put put, Cell kv) throws IOException {
392      put.add(kv);
393    }
394
395    @Override
396    public void setup(Context context) {
397      LOG.info("Setting up " + getClass() + " mapper.");
398      Configuration conf = context.getConfiguration();
399      cfRenameMap = createCfRenameMap(conf);
400      filter = instantiateFilter(conf);
401      String durabilityStr = conf.get(WAL_DURABILITY);
402      if (durabilityStr != null) {
403        durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
404        LOG.info("setting WAL durability to " + durability);
405      } else {
406        LOG.info("setting WAL durability to default.");
407      }
408      // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
409      ZKWatcher zkw = null;
410      Exception ex = null;
411      try {
412        zkw = new ZKWatcher(conf, context.getTaskAttemptID().toString(), null);
413        clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
414      } catch (ZooKeeperConnectionException e) {
415        ex = e;
416        LOG.error("Problem connecting to ZooKeper during task setup", e);
417      } catch (KeeperException e) {
418        ex = e;
419        LOG.error("Problem reading ZooKeeper data during task setup", e);
420      } catch (IOException e) {
421        ex = e;
422        LOG.error("Problem setting up task", e);
423      } finally {
424        if (zkw != null) zkw.close();
425      }
426      if (clusterIds == null) {
427        // exit early if setup fails
428        throw new RuntimeException(ex);
429      }
430    }
431  }
432
433  /**
434   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
435   * optionally not include in the job output
436   * @param conf {@link Configuration} from which to load the filter
437   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
438   * @throws IllegalArgumentException if the filter is misconfigured
439   */
440  public static Filter instantiateFilter(Configuration conf) {
441    // get the filter, if it was configured
442    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
443    if (filterClass == null) {
444      LOG.debug("No configured filter class, accepting all keyvalues.");
445      return null;
446    }
447    LOG.debug("Attempting to create filter:" + filterClass);
448    String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
449    ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
450    try {
451      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
452      return (Filter) m.invoke(null, quotedArgs);
453    } catch (IllegalAccessException e) {
454      LOG.error("Couldn't instantiate filter!", e);
455      throw new RuntimeException(e);
456    } catch (SecurityException e) {
457      LOG.error("Couldn't instantiate filter!", e);
458      throw new RuntimeException(e);
459    } catch (NoSuchMethodException e) {
460      LOG.error("Couldn't instantiate filter!", e);
461      throw new RuntimeException(e);
462    } catch (IllegalArgumentException e) {
463      LOG.error("Couldn't instantiate filter!", e);
464      throw new RuntimeException(e);
465    } catch (InvocationTargetException e) {
466      LOG.error("Couldn't instantiate filter!", e);
467      throw new RuntimeException(e);
468    }
469  }
470
471  private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
472    ArrayList<byte[]> quotedArgs = new ArrayList<>();
473    for (String stringArg : stringArgs) {
474      // all the filters' instantiation methods expected quoted args since they are coming from
475      // the shell, so add them here, though it shouldn't really be needed :-/
476      quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
477    }
478    return quotedArgs;
479  }
480
481  /**
482   * Attempt to filter out the keyvalue
483   * @param c {@link Cell} on which to apply the filter
484   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
485   *         {@link Cell}
486   */
487  public static ExtendedCell filterKv(Filter filter, ExtendedCell c) throws IOException {
488    // apply the filter and skip this kv if the filter doesn't apply
489    if (filter != null) {
490      Filter.ReturnCode code = filter.filterCell(c);
491      if (LOG.isTraceEnabled()) {
492        LOG.trace("Filter returned:" + code + " for the cell:" + c);
493      }
494      // if its not an accept type, then skip this kv
495      if (
496        !(code.equals(Filter.ReturnCode.INCLUDE)
497          || code.equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))
498      ) {
499        return null;
500      }
501    }
502    return c;
503  }
504
505  // helper: create a new KeyValue based on CF rename map
506  private static ExtendedCell convertKv(ExtendedCell kv, Map<byte[], byte[]> cfRenameMap) {
507    if (cfRenameMap != null) {
508      // If there's a rename mapping for this CF, create a new KeyValue
509      byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
510      if (newCfName != null) {
511        List<Tag> tags = PrivateCellUtil.getTags(kv);
512        kv = new KeyValue(kv.getRowArray(), // row buffer
513          kv.getRowOffset(), // row offset
514          kv.getRowLength(), // row length
515          newCfName, // CF buffer
516          0, // CF offset
517          newCfName.length, // CF length
518          kv.getQualifierArray(), // qualifier buffer
519          kv.getQualifierOffset(), // qualifier offset
520          kv.getQualifierLength(), // qualifier length
521          kv.getTimestamp(), // timestamp
522          KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
523          kv.getValueArray(), // value buffer
524          kv.getValueOffset(), // value offset
525          kv.getValueLength(), // value length
526          tags.size() == 0 ? null : tags);
527      }
528    }
529    return kv;
530  }
531
532  // helper: make a map from sourceCfName to destCfName by parsing a config key
533  private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
534    Map<byte[], byte[]> cfRenameMap = null;
535    String allMappingsPropVal = conf.get(CF_RENAME_PROP);
536    if (allMappingsPropVal != null) {
537      // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
538      String[] allMappings = allMappingsPropVal.split(",");
539      for (String mapping : allMappings) {
540        if (cfRenameMap == null) {
541          cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
542        }
543        String[] srcAndDest = mapping.split(":");
544        if (srcAndDest.length != 2) {
545          continue;
546        }
547        cfRenameMap.put(Bytes.toBytes(srcAndDest[0]), Bytes.toBytes(srcAndDest[1]));
548      }
549    }
550    return cfRenameMap;
551  }
552
553  /**
554   * <p>
555   * Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells the mapper
556   * how to rename column families.
557   * <p>
558   * Alternately, instead of calling this function, you could set the configuration key
559   * {@link #CF_RENAME_PROP} yourself. The value should look like
560   *
561   * <pre>
562   * srcCf1:destCf1,srcCf2:destCf2,....
563   * </pre>
564   *
565   * . This would have the same effect on the mapper behavior.
566   * @param conf      the Configuration in which the {@link #CF_RENAME_PROP} key will be set
567   * @param renameMap a mapping from source CF names to destination CF names
568   */
569  static public void configureCfRenaming(Configuration conf, Map<String, String> renameMap) {
570    StringBuilder sb = new StringBuilder();
571    for (Map.Entry<String, String> entry : renameMap.entrySet()) {
572      String sourceCf = entry.getKey();
573      String destCf = entry.getValue();
574
575      if (
576        sourceCf.contains(":") || sourceCf.contains(",") || destCf.contains(":")
577          || destCf.contains(",")
578      ) {
579        throw new IllegalArgumentException(
580          "Illegal character in CF names: " + sourceCf + ", " + destCf);
581      }
582
583      if (sb.length() != 0) {
584        sb.append(",");
585      }
586      sb.append(sourceCf + ":" + destCf);
587    }
588    conf.set(CF_RENAME_PROP, sb.toString());
589  }
590
591  /**
592   * Add a Filter to be instantiated on import
593   * @param conf       Configuration to update (will be passed to the job)
594   * @param clazz      {@link Filter} subclass to instantiate on the server.
595   * @param filterArgs List of arguments to pass to the filter on instantiation
596   */
597  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
598    List<String> filterArgs) throws IOException {
599    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
600    conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
601  }
602
603  /**
604   * Sets up the actual job.
605   * @param conf The current configuration.
606   * @param args The command line parameters.
607   * @return The newly created job.
608   * @throws IOException When setting up the job fails.
609   */
610  public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
611    TableName tableName = TableName.valueOf(args[0]);
612    conf.set(TABLE_NAME, tableName.getNameAsString());
613    Path inputDir = new Path(args[1]);
614    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
615    job.setJarByClass(Importer.class);
616    FileInputFormat.setInputPaths(job, inputDir);
617    job.setInputFormatClass(SequenceFileInputFormat.class);
618    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
619
620    // make sure we get the filter in the jars
621    try {
622      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
623      if (filter != null) {
624        TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
625      }
626    } catch (Exception e) {
627      throw new IOException(e);
628    }
629
630    if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
631      LOG.info("Use Large Result!!");
632      try (Connection conn = ConnectionFactory.createConnection(conf);
633        Table table = conn.getTable(tableName);
634        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
635        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
636        job.setMapperClass(CellSortImporter.class);
637        job.setReducerClass(CellReducer.class);
638        Path outputDir = new Path(hfileOutPath);
639        FileOutputFormat.setOutputPath(job, outputDir);
640        job.setMapOutputKeyClass(CellWritableComparable.class);
641        job.setMapOutputValueClass(MapReduceExtendedCell.class);
642        job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
643          CellWritableComparable.CellWritableComparator.class, RawComparator.class);
644        Path partitionsPath =
645          new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
646        FileSystem fs = FileSystem.get(job.getConfiguration());
647        fs.deleteOnExit(partitionsPath);
648        job.setPartitionerClass(CellWritableComparablePartitioner.class);
649        job.setNumReduceTasks(regionLocator.getStartKeys().length);
650        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
651          org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
652      }
653    } else if (hfileOutPath != null) {
654      LOG.info("writing to hfiles for bulk load.");
655      job.setMapperClass(CellImporter.class);
656      try (Connection conn = ConnectionFactory.createConnection(conf);
657        Table table = conn.getTable(tableName);
658        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
659        job.setReducerClass(CellSortReducer.class);
660        Path outputDir = new Path(hfileOutPath);
661        FileOutputFormat.setOutputPath(job, outputDir);
662        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
663        job.setMapOutputValueClass(MapReduceExtendedCell.class);
664        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
665        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
666          org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
667      }
668    } else {
669      LOG.info("writing directly to table from Mapper.");
670      // No reducers. Just write straight to table. Call initTableReducerJob
671      // because it sets up the TableOutputFormat.
672      job.setMapperClass(Importer.class);
673      TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
674      job.setNumReduceTasks(0);
675    }
676    return job;
677  }
678
679  /*
680   * @param errorMsg Error message. Can be null.
681   */
682  private static void usage(final String errorMsg) {
683    if (errorMsg != null && errorMsg.length() > 0) {
684      System.err.println("ERROR: " + errorMsg);
685    }
686    System.err.println("Usage: Import [options] <tablename> <inputdir>");
687    System.err.println("By default Import will load data directly into HBase. To instead generate");
688    System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
689    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
690    System.err.println("If there is a large result that includes too much Cell "
691      + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
692    System.err.println("  -D" + HAS_LARGE_RESULT + "=true");
693    System.err
694      .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
695    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
696    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
697    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
698      + CF_RENAME_PROP + " property. Futher, filters will only use the"
699      + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
700      + " whether the current row needs to be ignored completely for processing and "
701      + " Filter#filterCell(Cell) method to determine if the Cell should be added;"
702      + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
703      + " the Cell.");
704    System.err.println("To import data exported from HBase 0.94, use");
705    System.err.println("  -Dhbase.import.version=0.94");
706    System.err.println("  -D " + JOB_NAME_CONF_KEY
707      + "=jobName - use the specified mapreduce job name for the import");
708    System.err.println("For performance consider the following options:\n"
709      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false\n"
710      + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
711      + " Allowed values are the supported durability values"
712      + " like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
713  }
714
715  /**
716   * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
717   * need to flush all the regions of the table as the data is held in memory and is also not
718   * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
719   * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
720   */
721  public static void flushRegionsIfNecessary(Configuration conf)
722    throws IOException, InterruptedException {
723    String tableName = conf.get(TABLE_NAME);
724    Admin hAdmin = null;
725    Connection connection = null;
726    String durability = conf.get(WAL_DURABILITY);
727    // Need to flush if the data is written to hbase and skip wal is enabled.
728    if (
729      conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
730        && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)
731    ) {
732      LOG.info("Flushing all data that skipped the WAL.");
733      try {
734        connection = ConnectionFactory.createConnection(conf);
735        hAdmin = connection.getAdmin();
736        hAdmin.flush(TableName.valueOf(tableName));
737      } finally {
738        if (hAdmin != null) {
739          hAdmin.close();
740        }
741        if (connection != null) {
742          connection.close();
743        }
744      }
745    }
746  }
747
748  @Override
749  public int run(String[] args) throws Exception {
750    if (args.length < 2) {
751      usage("Wrong number of arguments: " + args.length);
752      return -1;
753    }
754    String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
755    if (inputVersionString != null) {
756      getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
757    }
758    Job job = createSubmittableJob(getConf(), args);
759    boolean isJobSuccessful = job.waitForCompletion(true);
760    if (isJobSuccessful) {
761      // Flush all the regions of the table
762      flushRegionsIfNecessary(getConf());
763    }
764    long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
765    long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
766    if (outputRecords < inputRecords) {
767      System.err.println("Warning, not all records were imported (maybe filtered out).");
768      if (outputRecords == 0) {
769        System.err.println("If the data was exported from HBase 0.94 "
770          + "consider using -Dhbase.import.version=0.94.");
771      }
772    }
773
774    return (isJobSuccessful ? 0 : 1);
775  }
776
777  /**
778   * Main entry point.
779   * @param args The command line parameters.
780   * @throws Exception When running the job fails.
781   */
782  public static void main(String[] args) throws Exception {
783    int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
784    System.exit(errCode);
785  }
786
787}