001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.io.InputStreamReader;
022import java.io.OutputStreamWriter;
023import java.security.MessageDigest;
024import java.security.NoSuchAlgorithmException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Properties;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.Pair;
045import org.apache.hadoop.io.MapFile;
046import org.apache.hadoop.io.NullWritable;
047import org.apache.hadoop.io.SequenceFile;
048import org.apache.hadoop.mapreduce.Job;
049import org.apache.hadoop.mapreduce.Reducer;
050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
051import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
052import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
053import org.apache.hadoop.util.GenericOptionsParser;
054import org.apache.hadoop.util.Tool;
055import org.apache.hadoop.util.ToolRunner;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
061import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
062import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;
063
064@InterfaceAudience.Private
065public class HashTable extends Configured implements Tool {
066
067  private static final Logger LOG = LoggerFactory.getLogger(HashTable.class);
068
069  private static final int DEFAULT_BATCH_SIZE = 8000;
070
071  private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
072  final static String PARTITIONS_FILE_NAME = "partitions";
073  final static String MANIFEST_FILE_NAME = "manifest";
074  final static String HASH_DATA_DIR = "hashes";
075  final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
076  final static String IGNORE_TIMESTAMPS = "ignoreTimestamps";
077  private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
078
079  TableHash tableHash = new TableHash();
080  Path destPath;
081
082  public HashTable(Configuration conf) {
083    super(conf);
084  }
085
086  public static class TableHash {
087
088    Path hashDir;
089
090    String tableName;
091    String families = null;
092    long batchSize = DEFAULT_BATCH_SIZE;
093    int numHashFiles = 0;
094    byte[] startRow = HConstants.EMPTY_START_ROW;
095    byte[] stopRow = HConstants.EMPTY_END_ROW;
096    int scanBatch = 0;
097    int versions = -1;
098    long startTime = 0;
099    long endTime = 0;
100    boolean ignoreTimestamps;
101    boolean rawScan;
102
103    List<ImmutableBytesWritable> partitions;
104
105    public static TableHash read(Configuration conf, Path hashDir) throws IOException {
106      TableHash tableHash = new TableHash();
107      FileSystem fs = hashDir.getFileSystem(conf);
108      tableHash.hashDir = hashDir;
109      tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
110      tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
111      return tableHash;
112    }
113
114    void writePropertiesFile(FileSystem fs, Path path) throws IOException {
115      Properties p = new Properties();
116      p.setProperty("table", tableName);
117      if (families != null) {
118        p.setProperty("columnFamilies", families);
119      }
120      p.setProperty("targetBatchSize", Long.toString(batchSize));
121      p.setProperty("numHashFiles", Integer.toString(numHashFiles));
122      if (!isTableStartRow(startRow)) {
123        p.setProperty("startRowHex", Bytes.toHex(startRow));
124      }
125      if (!isTableEndRow(stopRow)) {
126        p.setProperty("stopRowHex", Bytes.toHex(stopRow));
127      }
128      if (scanBatch > 0) {
129        p.setProperty("scanBatch", Integer.toString(scanBatch));
130      }
131      if (versions >= 0) {
132        p.setProperty("versions", Integer.toString(versions));
133      }
134      if (startTime != 0) {
135        p.setProperty("startTimestamp", Long.toString(startTime));
136      }
137      if (endTime != 0) {
138        p.setProperty("endTimestamp", Long.toString(endTime));
139      }
140      p.setProperty("rawScan", Boolean.toString(rawScan));
141
142      try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
143        p.store(osw, null);
144      }
145    }
146
147    void readPropertiesFile(FileSystem fs, Path path) throws IOException {
148      Properties p = new Properties();
149      try (FSDataInputStream in = fs.open(path)) {
150        try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
151          p.load(isr);
152        }
153      }
154      tableName = p.getProperty("table");
155      families = p.getProperty("columnFamilies");
156      batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
157      numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
158
159      String startRowHex = p.getProperty("startRowHex");
160      if (startRowHex != null) {
161        startRow = Bytes.fromHex(startRowHex);
162      }
163      String stopRowHex = p.getProperty("stopRowHex");
164      if (stopRowHex != null) {
165        stopRow = Bytes.fromHex(stopRowHex);
166      }
167
168      String scanBatchString = p.getProperty("scanBatch");
169      if (scanBatchString != null) {
170        scanBatch = Integer.parseInt(scanBatchString);
171      }
172
173      String versionString = p.getProperty("versions");
174      if (versionString != null) {
175        versions = Integer.parseInt(versionString);
176      }
177
178      String rawScanString = p.getProperty("rawScan");
179      if (rawScanString != null) {
180        rawScan = Boolean.parseBoolean(rawScanString);
181      }
182
183      String startTimeString = p.getProperty("startTimestamp");
184      if (startTimeString != null) {
185        startTime = Long.parseLong(startTimeString);
186      }
187
188      String endTimeString = p.getProperty("endTimestamp");
189      if (endTimeString != null) {
190        endTime = Long.parseLong(endTimeString);
191      }
192    }
193
194    Scan initScan() throws IOException {
195      Scan scan = new Scan();
196      scan.setCacheBlocks(false);
197      if (startTime != 0 || endTime != 0) {
198        scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
199      }
200      if (scanBatch > 0) {
201        scan.setBatch(scanBatch);
202      }
203      if (versions >= 0) {
204        scan.setMaxVersions(versions);
205      }
206      if (!isTableStartRow(startRow)) {
207        scan.setStartRow(startRow);
208      }
209      if (!isTableEndRow(stopRow)) {
210        scan.setStopRow(stopRow);
211      }
212      if (families != null) {
213        for (String fam : families.split(",")) {
214          scan.addFamily(Bytes.toBytes(fam));
215        }
216      }
217      scan.setRaw(rawScan);
218
219      return scan;
220    }
221
222    /**
223     * Choose partitions between row ranges to hash to a single output file Selects region
224     * boundaries that fall within the scan range, and groups them into the desired number of
225     * partitions.
226     */
227    void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
228      List<byte[]> startKeys = new ArrayList<>();
229      for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
230        byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
231        byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
232
233        // if scan begins after this region, or starts before this region, then drop this region
234        // in other words:
235        // IF (scan begins before the end of this region
236        // AND scan ends before the start of this region)
237        // THEN include this region
238        if (
239          (isTableStartRow(startRow) || isTableEndRow(regionEndKey)
240            || Bytes.compareTo(startRow, regionEndKey) < 0)
241            && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
242              || Bytes.compareTo(stopRow, regionStartKey) > 0)
243        ) {
244          startKeys.add(regionStartKey);
245        }
246      }
247
248      int numRegions = startKeys.size();
249      if (numHashFiles == 0) {
250        numHashFiles = numRegions / 100;
251      }
252      if (numHashFiles == 0) {
253        numHashFiles = 1;
254      }
255      if (numHashFiles > numRegions) {
256        // can't partition within regions
257        numHashFiles = numRegions;
258      }
259
260      // choose a subset of start keys to group regions into ranges
261      partitions = new ArrayList<>(numHashFiles - 1);
262      // skip the first start key as it is not a partition between ranges.
263      for (long i = 1; i < numHashFiles; i++) {
264        int splitIndex = (int) (numRegions * i / numHashFiles);
265        partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
266      }
267    }
268
269    void writePartitionFile(Configuration conf, Path path) throws IOException {
270      FileSystem fs = path.getFileSystem(conf);
271      @SuppressWarnings("deprecation")
272      SequenceFile.Writer writer =
273        SequenceFile.createWriter(fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
274
275      for (int i = 0; i < partitions.size(); i++) {
276        writer.append(partitions.get(i), NullWritable.get());
277      }
278      writer.close();
279    }
280
281    private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
282      throws IOException {
283      @SuppressWarnings("deprecation")
284      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
285      ImmutableBytesWritable key = new ImmutableBytesWritable();
286      partitions = new ArrayList<>();
287      while (reader.next(key)) {
288        partitions.add(new ImmutableBytesWritable(key.copyBytes()));
289      }
290      reader.close();
291
292      if (!Ordering.natural().isOrdered(partitions)) {
293        throw new IOException("Partitions are not ordered!");
294      }
295    }
296
297    @Override
298    public String toString() {
299      StringBuilder sb = new StringBuilder();
300      sb.append("tableName=").append(tableName);
301      if (families != null) {
302        sb.append(", families=").append(families);
303      }
304      sb.append(", batchSize=").append(batchSize);
305      sb.append(", numHashFiles=").append(numHashFiles);
306      if (!isTableStartRow(startRow)) {
307        sb.append(", startRowHex=").append(Bytes.toHex(startRow));
308      }
309      if (!isTableEndRow(stopRow)) {
310        sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
311      }
312      if (scanBatch >= 0) {
313        sb.append(", scanBatch=").append(scanBatch);
314      }
315      if (versions >= 0) {
316        sb.append(", versions=").append(versions);
317      }
318      sb.append(", rawScan=").append(rawScan);
319      if (startTime != 0) {
320        sb.append("startTime=").append(startTime);
321      }
322      if (endTime != 0) {
323        sb.append("endTime=").append(endTime);
324      }
325      return sb.toString();
326    }
327
328    static String getDataFileName(int hashFileIndex) {
329      return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
330    }
331
332    /**
333     * Open a TableHash.Reader starting at the first hash at or after the given key.
334     */
335    public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
336      throws IOException {
337      return new Reader(conf, startKey);
338    }
339
340    public class Reader implements java.io.Closeable {
341      private final Configuration conf;
342
343      private int hashFileIndex;
344      private MapFile.Reader mapFileReader;
345
346      private boolean cachedNext;
347      private ImmutableBytesWritable key;
348      private ImmutableBytesWritable hash;
349
350      Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
351        this.conf = conf;
352        int partitionIndex = Collections.binarySearch(partitions, startKey);
353        if (partitionIndex >= 0) {
354          // if the key is equal to a partition, then go the file after that partition
355          hashFileIndex = partitionIndex + 1;
356        } else {
357          // if the key is between partitions, then go to the file between those partitions
358          hashFileIndex = -1 - partitionIndex;
359        }
360        openHashFile();
361
362        // MapFile's don't make it easy to seek() so that the subsequent next() returns
363        // the desired key/value pair. So we cache it for the first call of next().
364        hash = new ImmutableBytesWritable();
365        key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
366        if (key == null) {
367          cachedNext = false;
368          hash = null;
369        } else {
370          cachedNext = true;
371        }
372      }
373
374      /**
375       * Read the next key/hash pair. Returns true if such a pair exists and false when at the end
376       * of the data.
377       */
378      public boolean next() throws IOException {
379        if (cachedNext) {
380          cachedNext = false;
381          return true;
382        }
383        key = new ImmutableBytesWritable();
384        hash = new ImmutableBytesWritable();
385        while (true) {
386          boolean hasNext = mapFileReader.next(key, hash);
387          if (hasNext) {
388            return true;
389          }
390          hashFileIndex++;
391          if (hashFileIndex < TableHash.this.numHashFiles) {
392            mapFileReader.close();
393            openHashFile();
394          } else {
395            key = null;
396            hash = null;
397            return false;
398          }
399        }
400      }
401
402      /**
403       * Get the current key
404       * @return the current key or null if there is no current key
405       */
406      public ImmutableBytesWritable getCurrentKey() {
407        return key;
408      }
409
410      /**
411       * Get the current hash
412       * @return the current hash or null if there is no current hash
413       */
414      public ImmutableBytesWritable getCurrentHash() {
415        return hash;
416      }
417
418      private void openHashFile() throws IOException {
419        if (mapFileReader != null) {
420          mapFileReader.close();
421        }
422        Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
423        Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
424        mapFileReader = new MapFile.Reader(dataFile, conf);
425      }
426
427      @Override
428      public void close() throws IOException {
429        mapFileReader.close();
430      }
431    }
432  }
433
434  static boolean isTableStartRow(byte[] row) {
435    return Bytes.equals(HConstants.EMPTY_START_ROW, row);
436  }
437
438  static boolean isTableEndRow(byte[] row) {
439    return Bytes.equals(HConstants.EMPTY_END_ROW, row);
440  }
441
442  public Job createSubmittableJob(String[] args) throws IOException {
443    Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
444    generatePartitions(partitionsPath);
445
446    Job job = Job.getInstance(getConf(),
447      getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
448    Configuration jobConf = job.getConfiguration();
449    jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
450    jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
451    job.setJarByClass(HashTable.class);
452
453    TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
454      HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
455
456    // use a TotalOrderPartitioner and reducers to group region output into hash files
457    job.setPartitionerClass(TotalOrderPartitioner.class);
458    TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
459    job.setReducerClass(Reducer.class); // identity reducer
460    job.setNumReduceTasks(tableHash.numHashFiles);
461    job.setOutputKeyClass(ImmutableBytesWritable.class);
462    job.setOutputValueClass(ImmutableBytesWritable.class);
463    job.setOutputFormatClass(MapFileOutputFormat.class);
464    FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
465
466    return job;
467  }
468
469  private void generatePartitions(Path partitionsPath) throws IOException {
470    Connection connection = ConnectionFactory.createConnection(getConf());
471    Pair<byte[][], byte[][]> regionKeys =
472      connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
473    connection.close();
474
475    tableHash.selectPartitions(regionKeys);
476    LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
477
478    tableHash.writePartitionFile(getConf(), partitionsPath);
479  }
480
481  static class ResultHasher {
482    private MessageDigest digest;
483
484    private boolean batchStarted = false;
485    private ImmutableBytesWritable batchStartKey;
486    private ImmutableBytesWritable batchHash;
487    private long batchSize = 0;
488    boolean ignoreTimestamps;
489
490    public ResultHasher() {
491      try {
492        digest = MessageDigest.getInstance("MD5");
493      } catch (NoSuchAlgorithmException e) {
494        Throwables.propagate(e);
495      }
496    }
497
498    public void startBatch(ImmutableBytesWritable row) {
499      if (batchStarted) {
500        throw new RuntimeException("Cannot start new batch without finishing existing one.");
501      }
502      batchStarted = true;
503      batchSize = 0;
504      batchStartKey = row;
505      batchHash = null;
506    }
507
508    public void hashResult(Result result) {
509      if (!batchStarted) {
510        throw new RuntimeException("Cannot add to batch that has not been started.");
511      }
512      for (Cell cell : result.rawCells()) {
513        int rowLength = cell.getRowLength();
514        int familyLength = cell.getFamilyLength();
515        int qualifierLength = cell.getQualifierLength();
516        int valueLength = cell.getValueLength();
517        digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
518        digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
519        digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
520
521        if (!ignoreTimestamps) {
522          long ts = cell.getTimestamp();
523          for (int i = 8; i > 0; i--) {
524            digest.update((byte) ts);
525            ts >>>= 8;
526          }
527        }
528        digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
529
530        batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
531      }
532    }
533
534    public void finishBatch() {
535      if (!batchStarted) {
536        throw new RuntimeException("Cannot finish batch that has not started.");
537      }
538      batchStarted = false;
539      batchHash = new ImmutableBytesWritable(digest.digest());
540    }
541
542    public boolean isBatchStarted() {
543      return batchStarted;
544    }
545
546    public ImmutableBytesWritable getBatchStartKey() {
547      return batchStartKey;
548    }
549
550    public ImmutableBytesWritable getBatchHash() {
551      return batchHash;
552    }
553
554    public long getBatchSize() {
555      return batchSize;
556    }
557  }
558
559  public static class HashMapper
560    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
561
562    private ResultHasher hasher;
563    private long targetBatchSize;
564
565    private ImmutableBytesWritable currentRow;
566
567    @Override
568    protected void setup(Context context) throws IOException, InterruptedException {
569      targetBatchSize =
570        context.getConfiguration().getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
571      hasher = new ResultHasher();
572      hasher.ignoreTimestamps = context.getConfiguration().getBoolean(IGNORE_TIMESTAMPS, false);
573      TableSplit split = (TableSplit) context.getInputSplit();
574      hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
575    }
576
577    @Override
578    protected void map(ImmutableBytesWritable key, Result value, Context context)
579      throws IOException, InterruptedException {
580
581      if (currentRow == null || !currentRow.equals(key)) {
582        currentRow = new ImmutableBytesWritable(key); // not immutable
583
584        if (hasher.getBatchSize() >= targetBatchSize) {
585          hasher.finishBatch();
586          context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
587          hasher.startBatch(currentRow);
588        }
589      }
590
591      hasher.hashResult(value);
592    }
593
594    @Override
595    protected void cleanup(Context context) throws IOException, InterruptedException {
596      hasher.finishBatch();
597      context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
598    }
599  }
600
601  private void writeTempManifestFile() throws IOException {
602    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
603    FileSystem fs = tempManifestPath.getFileSystem(getConf());
604    tableHash.writePropertiesFile(fs, tempManifestPath);
605  }
606
607  private void completeManifest() throws IOException {
608    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
609    Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
610    FileSystem fs = tempManifestPath.getFileSystem(getConf());
611    fs.rename(tempManifestPath, manifestPath);
612  }
613
614  private static final int NUM_ARGS = 2;
615
616  private static void printUsage(final String errorMsg) {
617    if (errorMsg != null && errorMsg.length() > 0) {
618      System.err.println("ERROR: " + errorMsg);
619      System.err.println();
620    }
621    System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
622    System.err.println();
623    System.err.println("Options:");
624    System.err.println(" batchsize         the target amount of bytes to hash in each batch");
625    System.err.println("                   rows are added to the batch until this size is reached");
626    System.err.println("                   (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
627    System.err.println(" numhashfiles      the number of hash files to create");
628    System.err.println("                   if set to fewer than number of regions then");
629    System.err.println("                   the job will create this number of reducers");
630    System.err.println("                   (defaults to 1/100 of regions -- at least 1)");
631    System.err.println(" startrow          the start row");
632    System.err.println(" stoprow           the stop row");
633    System.err.println(" starttime         beginning of the time range (unixtime in millis)");
634    System.err.println("                   without endtime means from starttime to forever");
635    System.err.println(" endtime           end of the time range.");
636    System.err.println("                   Ignored if no starttime specified.");
637    System.err.println(" scanbatch         scanner batch size to support intra row scans");
638    System.err.println(" versions          number of cell versions to include");
639    System.err.println(" rawScan           performs a raw scan (false if omitted)");
640    System.err.println(" families          comma-separated list of families to include");
641    System.err.println(" ignoreTimestamps  if true, ignores cell timestamps");
642    System.err.println("                   when calculating hashes");
643    System.err.println();
644    System.err.println("Args:");
645    System.err.println(" tablename     Name of the table to hash");
646    System.err.println(" outputpath    Filesystem path to put the output data");
647    System.err.println();
648    System.err.println("Examples:");
649    System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
650    System.err.println(" $ hbase "
651      + "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
652      + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
653      + " TestTable /hashes/testTable");
654  }
655
656  private boolean doCommandLine(final String[] args) {
657    if (args.length < NUM_ARGS) {
658      printUsage(null);
659      return false;
660    }
661    try {
662
663      tableHash.tableName = args[args.length - 2];
664      destPath = new Path(args[args.length - 1]);
665
666      for (int i = 0; i < args.length - NUM_ARGS; i++) {
667        String cmd = args[i];
668        if (cmd.equals("-h") || cmd.startsWith("--h")) {
669          printUsage(null);
670          return false;
671        }
672
673        final String batchSizeArgKey = "--batchsize=";
674        if (cmd.startsWith(batchSizeArgKey)) {
675          tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
676          continue;
677        }
678
679        final String numHashFilesArgKey = "--numhashfiles=";
680        if (cmd.startsWith(numHashFilesArgKey)) {
681          tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
682          continue;
683        }
684
685        final String startRowArgKey = "--startrow=";
686        if (cmd.startsWith(startRowArgKey)) {
687          tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
688          continue;
689        }
690
691        final String stopRowArgKey = "--stoprow=";
692        if (cmd.startsWith(stopRowArgKey)) {
693          tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
694          continue;
695        }
696
697        final String startTimeArgKey = "--starttime=";
698        if (cmd.startsWith(startTimeArgKey)) {
699          tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
700          continue;
701        }
702
703        final String endTimeArgKey = "--endtime=";
704        if (cmd.startsWith(endTimeArgKey)) {
705          tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
706          continue;
707        }
708
709        final String scanBatchArgKey = "--scanbatch=";
710        if (cmd.startsWith(scanBatchArgKey)) {
711          tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
712          continue;
713        }
714
715        final String versionsArgKey = "--versions=";
716        if (cmd.startsWith(versionsArgKey)) {
717          tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
718          continue;
719        }
720
721        final String rawScanArgKey = "--rawScan=";
722        if (cmd.startsWith(rawScanArgKey)) {
723          tableHash.rawScan = Boolean.parseBoolean(cmd.substring(rawScanArgKey.length()));
724          continue;
725        }
726
727        final String familiesArgKey = "--families=";
728        if (cmd.startsWith(familiesArgKey)) {
729          tableHash.families = cmd.substring(familiesArgKey.length());
730          continue;
731        }
732
733        final String ignoreTimestampsKey = "--ignoreTimestamps=";
734        if (cmd.startsWith(ignoreTimestampsKey)) {
735          tableHash.ignoreTimestamps =
736            Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
737          continue;
738        }
739
740        printUsage("Invalid argument '" + cmd + "'");
741        return false;
742      }
743      if (
744        (tableHash.startTime != 0 || tableHash.endTime != 0)
745          && (tableHash.startTime >= tableHash.endTime)
746      ) {
747        printUsage("Invalid time range filter: starttime=" + tableHash.startTime + " >=  endtime="
748          + tableHash.endTime);
749        return false;
750      }
751
752    } catch (Exception e) {
753      e.printStackTrace();
754      printUsage("Can't start because " + e.getMessage());
755      return false;
756    }
757    return true;
758  }
759
760  /**
761   * Main entry point.
762   */
763  public static void main(String[] args) throws Exception {
764    int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
765    System.exit(ret);
766  }
767
768  @Override
769  public int run(String[] args) throws Exception {
770    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
771    if (!doCommandLine(otherArgs)) {
772      return 1;
773    }
774
775    Job job = createSubmittableJob(otherArgs);
776    writeTempManifestFile();
777    if (!job.waitForCompletion(true)) {
778      LOG.info("Map-reduce job failed!");
779      return 1;
780    }
781    completeManifest();
782    return 0;
783  }
784
785}