018package org.apache.hadoop.hbase.snapshot;
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.function.BiConsumer;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.fs.FileChecksum;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.io.FileLink;
049import org.apache.hadoop.hbase.io.HFileLink;
050import org.apache.hadoop.hbase.io.WALLink;
051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
053import org.apache.hadoop.hbase.mob.MobUtils;
054import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
055import org.apache.hadoop.hbase.util.AbstractHBaseTool;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.FSUtils;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.mapreduce.InputFormat;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.JobContext;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.RecordReader;
070import org.apache.hadoop.mapreduce.TaskAttemptContext;
071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
072import org.apache.hadoop.mapreduce.security.TokenCache;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.hadoop.util.Tool;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
088 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
089 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
090 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
091 */
093public class ExportSnapshot extends AbstractHBaseTool implements Tool {
094  public static final String NAME = "exportsnapshot";
095  /** Configuration prefix for overrides for the source filesystem */
096  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
097  /** Configuration prefix for overrides for the destination filesystem */
098  public static final String CONF_DEST_PREFIX = NAME + ".to.";
100  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
102  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
103  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
104  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
105  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
106  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
107  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
108  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
109  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
110  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
111  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
112  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
113  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
114  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
115  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
116  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
117  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
118  private static final String CONF_COPY_MANIFEST_THREADS =
119    "snapshot.export.copy.references.threads";
120  private static final int DEFAULT_COPY_MANIFEST_THREADS =
121    Runtime.getRuntime().availableProcessors();
123  static class Testing {
124    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
125    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
126    int failuresCountToInject = 0;
127    int injectedFailureCount = 0;
128  }
130  // Command line options and defaults.
131  static final class Options {
132    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
133    static final Option TARGET_NAME =
134      new Option(null, "target", true, "Target name for the snapshot.");
135    static final Option COPY_TO =
136      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
137    static final Option COPY_FROM =
138      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
139    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
140      "Do not verify checksum, use name+length only.");
141    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
142      "Do not verify the exported snapshot's expiration status and integrity.");
143    static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false,
144      "Do not verify the source snapshot's expiration status and integrity.");
145    static final Option OVERWRITE =
146      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
147    static final Option CHUSER =
148      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
149    static final Option CHGROUP =
150      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
151    static final Option CHMOD =
152      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
153    static final Option MAPPERS = new Option(null, "mappers", true,
154      "Number of mappers to use during the copy (mapreduce.job.maps).");
155    static final Option BANDWIDTH =
156      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
157    static final Option RESET_TTL =
158      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
159  }
161  // Export Map-Reduce Counters, to keep track of the progress
162  public enum Counter {
170  }
172  /**
173   * Indicates the checksum comparison result.
174   */
175  public enum ChecksumComparison {
176    TRUE, // checksum comparison is compatible and true.
177    FALSE, // checksum comparison is compatible and false.
178    INCOMPATIBLE, // checksum comparison is not compatible.
179  }
181  private static class ExportMapper
182    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
183    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
184    final static int REPORT_SIZE = 1 * 1024 * 1024;
185    final static int BUFFER_SIZE = 64 * 1024;
187    private boolean verifyChecksum;
188    private String filesGroup;
189    private String filesUser;
190    private short filesMode;
191    private int bufferSize;
192    private int reportSize;
194    private FileSystem outputFs;
195    private Path outputArchive;
196    private Path outputRoot;
198    private FileSystem inputFs;
199    private Path inputArchive;
200    private Path inputRoot;
202    private static Testing testing = new Testing();
204    @Override
205    public void setup(Context context) throws IOException {
206      Configuration conf = context.getConfiguration();
208      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
209      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
211      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
213      filesGroup = conf.get(CONF_FILES_GROUP);
214      filesUser = conf.get(CONF_FILES_USER);
215      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
216      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
217      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
219      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
220      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
222      try {
223        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
224      } catch (IOException e) {
225        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
226      }
228      try {
229        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
230      } catch (IOException e) {
231        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
232      }
234      // Use the default block size of the outputFs if bigger
235      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
236      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
237      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
238      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
240      for (Counter c : Counter.values()) {
241        context.getCounter(c).increment(0);
242      }
243      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
244        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
245        // Get number of times we have already injected failure based on attempt number of this
246        // task.
247        testing.injectedFailureCount = context.getTaskAttemptID().getId();
248      }
249    }
251    @Override
252    public void map(BytesWritable key, NullWritable value, Context context)
253      throws InterruptedException, IOException {
254      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
255      Path outputPath = getOutputPath(inputInfo);
257      copyFile(context, inputInfo, outputPath);
258    }
260    /**
261     * Returns the location where the inputPath will be copied.
262     */
263    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
264      Path path = null;
265      switch (inputInfo.getType()) {
266        case HFILE:
267          Path inputPath = new Path(inputInfo.getHfile());
268          String family = inputPath.getParent().getName();
269          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
270          String region = HFileLink.getReferencedRegionName(inputPath.getName());
271          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
272          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
273            new Path(region, new Path(family, hfile)));
274          break;
275        case WAL:
276          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
277          break;
278        default:
279          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
280      }
281      return new Path(outputArchive, path);
282    }
284    @SuppressWarnings("checkstyle:linelength")
285    /**
286     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
287     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
288     */
289    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
290      throws IOException {
291      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
292      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
293      testing.injectedFailureCount++;
294      context.getCounter(Counter.COPY_FAILED).increment(1);
295      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
296      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
297        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
298    }
300    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
301      final Path outputPath) throws IOException {
302      // Get the file information
303      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
305      // Verify if the output file exists and is the same that we want to copy
306      if (outputFs.exists(outputPath)) {
307        FileStatus outputStat = outputFs.getFileStatus(outputPath);
308        if (outputStat != null && sameFile(inputStat, outputStat)) {
309          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
310          context.getCounter(Counter.FILES_SKIPPED).increment(1);
311          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
312          return;
313        }
314      }
316      InputStream in = openSourceFile(context, inputInfo);
317      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
318      if (Integer.MAX_VALUE != bandwidthMB) {
319        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
320      }
322      Path inputPath = inputStat.getPath();
323      try {
324        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
326        // Ensure that the output folder is there and copy the file
327        createOutputPath(outputPath.getParent());
328        FSDataOutputStream out = outputFs.create(outputPath, true);
330        long stime = EnvironmentEdgeManager.currentTime();
331        long totalBytesWritten =
332          copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
334        // Verify the file length and checksum
335        verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath));
337        long etime = EnvironmentEdgeManager.currentTime();
338        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
339        LOG
340          .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten)
341            + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String
342              .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
343        context.getCounter(Counter.FILES_COPIED).increment(1);
345        // Try to Preserve attributes
346        if (!preserveAttributes(outputPath, inputStat)) {
347          LOG.warn("You may have to run manually chown on: " + outputPath);
348        }
349      } catch (IOException e) {
350        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
351        context.getCounter(Counter.COPY_FAILED).increment(1);
352        throw e;
353      } finally {
354        injectTestFailure(context, inputInfo);
355      }
356    }
358    /**
359     * Create the output folder and optionally set ownership.
360     */
361    private void createOutputPath(final Path path) throws IOException {
362      if (filesUser == null && filesGroup == null) {
363        outputFs.mkdirs(path);
364      } else {
365        Path parent = path.getParent();
366        if (!outputFs.exists(parent) && !parent.isRoot()) {
367          createOutputPath(parent);
368        }
369        outputFs.mkdirs(path);
370        if (filesUser != null || filesGroup != null) {
371          // override the owner when non-null user/group is specified
372          outputFs.setOwner(path, filesUser, filesGroup);
373        }
374        if (filesMode > 0) {
375          outputFs.setPermission(path, new FsPermission(filesMode));
376        }
377      }
378    }
380    /**
381     * Try to Preserve the files attribute selected by the user copying them from the source file
382     * This is only required when you are exporting as a different user than "hbase" or on a system
383     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
384     * can force a chmod with the user that knows is available on the system.
385     */
386    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
387      FileStatus stat;
388      try {
389        stat = outputFs.getFileStatus(path);
390      } catch (IOException e) {
391        LOG.warn("Unable to get the status for file=" + path);
392        return false;
393      }
395      try {
396        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
397          outputFs.setPermission(path, new FsPermission(filesMode));
398        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
399          outputFs.setPermission(path, refStat.getPermission());
400        }
401      } catch (IOException e) {
402        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
403        return false;
404      }
406      boolean hasRefStat = (refStat != null);
407      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
408      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
409      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
410        try {
411          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
412            outputFs.setOwner(path, user, group);
413          }
414        } catch (IOException e) {
415          LOG.warn(
416            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
417          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
418            + " group=" + group);
419          return false;
420        }
421      }
423      return true;
424    }
426    private boolean stringIsNotEmpty(final String str) {
427      return str != null && str.length() > 0;
428    }
430    private long copyData(final Context context, final Path inputPath, final InputStream in,
431      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
432      throws IOException {
433      final String statusMessage =
434        "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)";
436      try {
437        byte[] buffer = new byte[bufferSize];
438        long totalBytesWritten = 0;
439        int reportBytes = 0;
440        int bytesRead;
442        while ((bytesRead = in.read(buffer)) > 0) {
443          out.write(buffer, 0, bytesRead);
444          totalBytesWritten += bytesRead;
445          reportBytes += bytesRead;
447          if (reportBytes >= reportSize) {
448            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
449            context.setStatus(
450              String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
451                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
452                + " to " + outputPath);
453            reportBytes = 0;
454          }
455        }
457        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
458        context
459          .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
460            (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
461            + outputPath);
463        return totalBytesWritten;
464      } finally {
465        out.close();
466        in.close();
467      }
468    }
470    /**
471     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
472     * fail or if the file is not found.
473     */
474    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
475      throws IOException {
476      try {
477        Configuration conf = context.getConfiguration();
478        FileLink link = null;
479        switch (fileInfo.getType()) {
480          case HFILE:
481            Path inputPath = new Path(fileInfo.getHfile());
482            link = getFileLink(inputPath, conf);
483            break;
484          case WAL:
485            String serverName = fileInfo.getWalServer();
486            String logName = fileInfo.getWalName();
487            link = new WALLink(inputRoot, serverName, logName);
488            break;
489          default:
490            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
491        }
492        return link.open(inputFs);
493      } catch (IOException e) {
494        context.getCounter(Counter.MISSING_FILES).increment(1);
495        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
496        throw e;
497      }
498    }
500    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
501      throws IOException {
502      try {
503        Configuration conf = context.getConfiguration();
504        FileLink link = null;
505        switch (fileInfo.getType()) {
506          case HFILE:
507            Path inputPath = new Path(fileInfo.getHfile());
508            link = getFileLink(inputPath, conf);
509            break;
510          case WAL:
511            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
512            break;
513          default:
514            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
515        }
516        return link.getFileStatus(inputFs);
517      } catch (FileNotFoundException e) {
518        context.getCounter(Counter.MISSING_FILES).increment(1);
519        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
520        throw e;
521      } catch (IOException e) {
522        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
523        throw e;
524      }
525    }
527    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
528      String regionName = HFileLink.getReferencedRegionName(path.getName());
529      TableName tableName = HFileLink.getReferencedTableName(path.getName());
530      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
531        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
532          HFileArchiveUtil.getArchivePath(conf), path);
533      }
534      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
535    }
537    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
538      try {
539        return fs.getFileChecksum(path);
540      } catch (IOException e) {
541        LOG.warn("Unable to get checksum for file=" + path, e);
542        return null;
543      }
544    }
546    /**
547     * Utility to compare the file length and checksums for the paths specified.
548     */
549    private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat)
550      throws IOException {
551      long inputLen = inputStat.getLen();
552      long outputLen = outputStat.getLen();
553      Path inputPath = inputStat.getPath();
554      Path outputPath = outputStat.getPath();
556      if (inputLen != outputLen) {
557        throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen
558          + ") and output:" + outputPath + " (" + outputLen + ")");
559      }
561      // If length==0, we will skip checksum
562      if (inputLen != 0 && verifyChecksum) {
563        FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
564        FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
566        ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum);
567        if (!checksumComparison.equals(ChecksumComparison.TRUE)) {
568          StringBuilder errMessage = new StringBuilder("Checksum mismatch between ")
569            .append(inputPath).append(" and ").append(outputPath).append(".");
571          boolean addSkipHint = false;
572          String inputScheme = inputFs.getScheme();
573          String outputScheme = outputFs.getScheme();
574          if (!inputScheme.equals(outputScheme)) {
575            errMessage.append(" Input and output filesystems are of different types.\n")
576              .append("Their checksum algorithms may be incompatible.");
577            addSkipHint = true;
578          } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) {
579            errMessage.append(" Input and output differ in block-size.");
580            addSkipHint = true;
581          } else if (
582            inChecksum != null && outChecksum != null
583              && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
584          ) {
585            errMessage.append(" Input and output checksum algorithms are of different types.");
586            addSkipHint = true;
587          }
588          if (addSkipHint) {
589            errMessage
590              .append(" You can choose file-level checksum validation via "
591                + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
592                + " or filesystems are different.\n")
593              .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,")
594              .append(
595                " for the table backup scenario, you should use -i option to skip checksum-checks.\n")
596              .append(" (NOTE: By skipping checksums, one runs the risk of "
597                + "masking data-corruption during file-transfer.)\n");
598          }
599          throw new IOException(errMessage.toString());
600        }
601      }
602    }
604    /**
605     * Utility to compare checksums
606     */
607    private ChecksumComparison verifyChecksum(final FileChecksum inChecksum,
608      final FileChecksum outChecksum) {
609      // If the input or output checksum is null, or the algorithms of input and output are not
610      // equal, that means there is no comparison
611      // and return not compatible. else if matched, return compatible with the matched result.
612      if (
613        inChecksum == null || outChecksum == null
614          || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
615      ) {
616        return ChecksumComparison.INCOMPATIBLE;
617      } else if (inChecksum.equals(outChecksum)) {
618        return ChecksumComparison.TRUE;
619      }
620      return ChecksumComparison.FALSE;
621    }
623    /**
624     * Check if the two files are equal by looking at the file length, and at the checksum (if user
625     * has specified the verifyChecksum flag).
626     */
627    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
628      // Not matching length
629      if (inputStat.getLen() != outputStat.getLen()) return false;
631      // Mark files as equals, since user asked for no checksum verification
632      if (!verifyChecksum) return true;
634      // If checksums are not available, files are not the same.
635      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
636      if (inChecksum == null) return false;
638      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
639      if (outChecksum == null) return false;
641      return inChecksum.equals(outChecksum);
642    }
643  }
645  // ==========================================================================
646  // Input Format
647  // ==========================================================================
649  /**
650   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
651   * @return list of files referenced by the snapshot (pair of path and size)
652   */
653  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
654    final FileSystem fs, final Path snapshotDir) throws IOException {
655    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
657    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
658    final TableName table = TableName.valueOf(snapshotDesc.getTable());
660    // Get snapshot files
661    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
662    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
663      new SnapshotReferenceUtil.SnapshotVisitor() {
664        @Override
665        public void storeFile(final RegionInfo regionInfo, final String family,
666          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
667          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
668          if (!storeFile.hasReference()) {
669            String region = regionInfo.getEncodedName();
670            String hfile = storeFile.getName();
671            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
672              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
673          } else {
674            Pair<String, String> referredToRegionAndFile =
675              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
676            String referencedRegion = referredToRegionAndFile.getFirst();
677            String referencedHFile = referredToRegionAndFile.getSecond();
678            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
679              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
680          }
681          files.add(snapshotFileAndSize);
682        }
683      });
685    return files;
686  }
688  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
689    Configuration conf, TableName table, String region, String family, String hfile, long size)
690    throws IOException {
691    Path path = HFileLink.createPath(table, region, family, hfile);
692    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
693      .setHfile(path.toString()).build();
694    if (size == -1) {
695      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
696    }
697    return new Pair<>(fileInfo, size);
698  }
700  /**
701   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
702   * The groups created will have similar amounts of bytes.
703   * <p>
704   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
705   * group fetch the bigger file available, iterating through groups alternating the direction.
706   */
707  static List<List<Pair<SnapshotFileInfo, Long>>>
708    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
709    // Sort files by size, from small to big
710    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
711      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
712        long r = a.getSecond() - b.getSecond();
713        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
714      }
715    });
717    // create balanced groups
718    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
719    long[] sizeGroups = new long[ngroups];
720    int hi = files.size() - 1;
721    int lo = 0;
723    List<Pair<SnapshotFileInfo, Long>> group;
724    int dir = 1;
725    int g = 0;
727    while (hi >= lo) {
728      if (g == fileGroups.size()) {
729        group = new LinkedList<>();
730        fileGroups.add(group);
731      } else {
732        group = fileGroups.get(g);
733      }
735      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
737      // add the hi one
738      sizeGroups[g] += fileInfo.getSecond();
739      group.add(fileInfo);
741      // change direction when at the end or the beginning
742      g += dir;
743      if (g == ngroups) {
744        dir = -1;
745        g = ngroups - 1;
746      } else if (g < 0) {
747        dir = 1;
748        g = 0;
749      }
750    }
752    if (LOG.isDebugEnabled()) {
753      for (int i = 0; i < sizeGroups.length; ++i) {
754        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
755      }
756    }
758    return fileGroups;
759  }
761  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
762    @Override
763    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
764      TaskAttemptContext tac) throws IOException, InterruptedException {
765      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
766    }
768    @Override
769    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
770      Configuration conf = context.getConfiguration();
771      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
772      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
774      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
775      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
776      if (mappers == 0 && snapshotFiles.size() > 0) {
777        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
778        mappers = Math.min(mappers, snapshotFiles.size());
779        conf.setInt(CONF_NUM_SPLITS, mappers);
780        conf.setInt(MR_NUM_MAPS, mappers);
781      }
783      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
784      List<InputSplit> splits = new ArrayList(groups.size());
785      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
786        splits.add(new ExportSnapshotInputSplit(files));
787      }
788      return splits;
789    }
791    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
792      private List<Pair<BytesWritable, Long>> files;
793      private long length;
795      public ExportSnapshotInputSplit() {
796        this.files = null;
797      }
799      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
800        this.files = new ArrayList(snapshotFiles.size());
801        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
802          this.files.add(
803            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
804          this.length += fileInfo.getSecond();
805        }
806      }
808      private List<Pair<BytesWritable, Long>> getSplitKeys() {
809        return files;
810      }
812      @Override
813      public long getLength() throws IOException, InterruptedException {
814        return length;
815      }
817      @Override
818      public String[] getLocations() throws IOException, InterruptedException {
819        return new String[] {};
820      }
822      @Override
823      public void readFields(DataInput in) throws IOException {
824        int count = in.readInt();
825        files = new ArrayList<>(count);
826        length = 0;
827        for (int i = 0; i < count; ++i) {
828          BytesWritable fileInfo = new BytesWritable();
829          fileInfo.readFields(in);
830          long size = in.readLong();
831          files.add(new Pair<>(fileInfo, size));
832          length += size;
833        }
834      }
836      @Override
837      public void write(DataOutput out) throws IOException {
838        out.writeInt(files.size());
839        for (final Pair<BytesWritable, Long> fileInfo : files) {
840          fileInfo.getFirst().write(out);
841          out.writeLong(fileInfo.getSecond());
842        }
843      }
844    }
846    private static class ExportSnapshotRecordReader
847      extends RecordReader<BytesWritable, NullWritable> {
848      private final List<Pair<BytesWritable, Long>> files;
849      private long totalSize = 0;
850      private long procSize = 0;
851      private int index = -1;
853      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
854        this.files = files;
855        for (Pair<BytesWritable, Long> fileInfo : files) {
856          totalSize += fileInfo.getSecond();
857        }
858      }
860      @Override
861      public void close() {
862      }
864      @Override
865      public BytesWritable getCurrentKey() {
866        return files.get(index).getFirst();
867      }
869      @Override
870      public NullWritable getCurrentValue() {
871        return NullWritable.get();
872      }
874      @Override
875      public float getProgress() {
876        return (float) procSize / totalSize;
877      }
879      @Override
880      public void initialize(InputSplit split, TaskAttemptContext tac) {
881      }
883      @Override
884      public boolean nextKeyValue() {
885        if (index >= 0) {
886          procSize += files.get(index).getSecond();
887        }
888        return (++index < files.size());
889      }
890    }
891  }
893  // ==========================================================================
894  // Tool
895  // ==========================================================================
897  /**
898   * Run Map-Reduce Job to perform the files copy.
899   */
900  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
901    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
902    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
903    throws IOException, InterruptedException, ClassNotFoundException {
904    Configuration conf = getConf();
905    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
906    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
907    if (mappers > 0) {
908      conf.setInt(CONF_NUM_SPLITS, mappers);
909      conf.setInt(MR_NUM_MAPS, mappers);
910    }
911    conf.setInt(CONF_FILES_MODE, filesMode);
912    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
913    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
914    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
915    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
916    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
917    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
919    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
920    Job job = new Job(conf);
921    job.setJobName(jobname);
922    job.setJarByClass(ExportSnapshot.class);
923    TableMapReduceUtil.addDependencyJars(job);
924    job.setMapperClass(ExportMapper.class);
925    job.setInputFormatClass(ExportSnapshotInputFormat.class);
926    job.setOutputFormatClass(NullOutputFormat.class);
927    job.setMapSpeculativeExecution(false);
928    job.setNumReduceTasks(0);
930    // Acquire the delegation Tokens
931    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
932    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
933    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
934    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
936    // Run the MR Job
937    if (!job.waitForCompletion(true)) {
938      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
939    }
940  }
942  private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf,
943    final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
944    // Update the conf with the current root dir, since may be a different cluster
945    Configuration conf = new Configuration(baseConf);
946    CommonFSUtils.setRootDir(conf, rootDir);
947    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
948    boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(),
949      snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime());
950    if (isExpired) {
951      throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc));
952    }
953    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
954  }
956  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
957    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
958    ExecutorService pool = Executors
960    List<Future<Void>> futures = new ArrayList<>();
961    for (Path dstPath : traversedPath) {
962      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
963      futures.add(future);
964    }
965    try {
966      for (Future<Void> future : futures) {
967        future.get();
968      }
969    } catch (InterruptedException | ExecutionException e) {
970      throw new IOException(e);
971    } finally {
972      pool.shutdownNow();
973    }
974  }
976  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
977    Configuration conf, List<Path> traversedPath) throws IOException {
978    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
979      try {
980        fs.setOwner(path, filesUser, filesGroup);
981      } catch (IOException e) {
982        throw new RuntimeException(
983          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
984      }
985    }, conf);
986  }
988  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
989    final List<Path> traversedPath, final Configuration conf) throws IOException {
990    if (filesMode <= 0) {
991      return;
992    }
993    FsPermission perm = new FsPermission(filesMode);
994    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
995      try {
996        fs.setPermission(path, perm);
997      } catch (IOException e) {
998        throw new RuntimeException(
999          "set permission for file " + path + " to " + filesMode + " failed", e);
1000      }
1001    }, conf);
1002  }
1004  private boolean verifyTarget = true;
1005  private boolean verifySource = true;
1006  private boolean verifyChecksum = true;
1007  private String snapshotName = null;
1008  private String targetName = null;
1009  private boolean overwrite = false;
1010  private String filesGroup = null;
1011  private String filesUser = null;
1012  private Path outputRoot = null;
1013  private Path inputRoot = null;
1014  private int bandwidthMB = Integer.MAX_VALUE;
1015  private int filesMode = 0;
1016  private int mappers = 0;
1017  private boolean resetTtl = false;
1019  @Override
1020  protected void processOptions(CommandLine cmd) {
1021    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
1022    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
1023    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
1024      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
1025    }
1026    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
1027      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
1028    }
1029    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
1030    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
1031    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
1032    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
1033    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
1034    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
1035    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
1036    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
1037    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
1038    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
1039    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
1040  }
1042  /**
1043   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
1044   * @return 0 on success, and != 0 upon failure.
1045   */
1046  @Override
1047  public int doWork() throws IOException {
1048    Configuration conf = getConf();
1050    // Check user options
1051    if (snapshotName == null) {
1052      System.err.println("Snapshot name not provided.");
1053      LOG.error("Use -h or --help for usage instructions.");
1054      return EXIT_FAILURE;
1055    }
1057    if (outputRoot == null) {
1058      System.err
1059        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
1060      LOG.error("Use -h or --help for usage instructions.");
1061      return EXIT_FAILURE;
1062    }
1064    if (targetName == null) {
1065      targetName = snapshotName;
1066    }
1067    if (inputRoot == null) {
1068      inputRoot = CommonFSUtils.getRootDir(conf);
1069    } else {
1070      CommonFSUtils.setRootDir(conf, inputRoot);
1071    }
1073    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1074    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
1075    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1076    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
1077    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
1078      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
1079    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
1080    Path snapshotTmpDir =
1081      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
1082    Path outputSnapshotDir =
1083      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
1084    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
1085    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
1086    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
1087      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
1089    // throw CorruptedSnapshotException if we can't read the snapshot info.
1090    SnapshotDescription sourceSnapshotDesc =
1091      SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir);
1093    // Verify snapshot source before copying files
1094    if (verifySource) {
1095      LOG.info("Verify the source snapshot's expiration status and integrity.");
1096      verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir);
1097    }
1099    // Find the necessary directory which need to change owner and group
1100    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1101    if (outputFs.exists(needSetOwnerDir)) {
1102      if (skipTmp) {
1103        needSetOwnerDir = outputSnapshotDir;
1104      } else {
1105        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1106        if (outputFs.exists(needSetOwnerDir)) {
1107          needSetOwnerDir = snapshotTmpDir;
1108        }
1109      }
1110    }
1112    // Check if the snapshot already exists
1113    if (outputFs.exists(outputSnapshotDir)) {
1114      if (overwrite) {
1115        if (!outputFs.delete(outputSnapshotDir, true)) {
1116          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1117          return EXIT_FAILURE;
1118        }
1119      } else {
1120        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1121          + outputSnapshotDir);
1122        return EXIT_FAILURE;
1123      }
1124    }
1126    if (!skipTmp) {
1127      // Check if the snapshot already in-progress
1128      if (outputFs.exists(snapshotTmpDir)) {
1129        if (overwrite) {
1130          if (!outputFs.delete(snapshotTmpDir, true)) {
1131            System.err
1132              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1133            return EXIT_FAILURE;
1134          }
1135        } else {
1136          System.err
1137            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1138          System.err
1139            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1140          System.err
1141            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1142          return EXIT_FAILURE;
1143        }
1144      }
1145    }
1147    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1148    // The snapshot references must be copied before the hfiles otherwise the cleaner
1149    // will remove them because they are unreferenced.
1150    List<Path> travesedPaths = new ArrayList<>();
1151    boolean copySucceeded = false;
1152    try {
1153      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1154      travesedPaths =
1155        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1157      copySucceeded = true;
1158    } catch (IOException e) {
1159      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1160        + " to=" + initialOutputSnapshotDir, e);
1161    } finally {
1162      if (copySucceeded) {
1163        if (filesUser != null || filesGroup != null) {
1164          LOG.warn(
1165            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1166              + (filesGroup == null
1167                ? ""
1168                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1169          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1170        }
1171        if (filesMode > 0) {
1172          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1173          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1174        }
1175      }
1176    }
1178    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1179    // reset TTL for target snapshot.
1180    if (!targetName.equals(snapshotName) || resetTtl) {
1181      SnapshotDescription.Builder snapshotDescBuilder =
1182        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1183      if (!targetName.equals(snapshotName)) {
1184        snapshotDescBuilder.setName(targetName);
1185      }
1186      if (resetTtl) {
1187        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1188      }
1189      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1190        initialOutputSnapshotDir, outputFs);
1191      if (filesUser != null || filesGroup != null) {
1192        outputFs.setOwner(
1193          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1194          filesGroup);
1195      }
1196      if (filesMode > 0) {
1197        outputFs.setPermission(
1198          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1199          new FsPermission((short) filesMode));
1200      }
1201    }
1203    // Step 2 - Start MR Job to copy files
1204    // The snapshot references must be copied before the files otherwise the files gets removed
1205    // by the HFileArchiver, since they have no references.
1206    try {
1207      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1208        filesGroup, filesMode, mappers, bandwidthMB);
1210      LOG.info("Finalize the Snapshot Export");
1211      if (!skipTmp) {
1212        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1213        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1214          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1215            + snapshotTmpDir + " to=" + outputSnapshotDir);
1216        }
1217      }
1219      // Step 4 - Verify snapshot integrity
1220      if (verifyTarget) {
1221        LOG.info("Verify the exported snapshot's expiration status and integrity.");
1222        SnapshotDescription targetSnapshotDesc =
1223          SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir);
1224        verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir);
1225      }
1227      LOG.info("Export Completed: " + targetName);
1228      return EXIT_SUCCESS;
1229    } catch (Exception e) {
1230      LOG.error("Snapshot export failed", e);
1231      if (!skipTmp) {
1232        outputFs.delete(snapshotTmpDir, true);
1233      }
1234      outputFs.delete(outputSnapshotDir, true);
1235      return EXIT_FAILURE;
1236    }
1237  }
1239  @Override
1240  protected void printUsage() {
1241    super.printUsage();
1242    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1243      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1244      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1245      + "  hbase snapshot export \\\n"
1246      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1247      + "    --copy-to hdfs://srv1:50070/hbase");
1248  }
1250  @Override
1251  protected void addOptions() {
1252    addRequiredOption(Options.SNAPSHOT);
1253    addOption(Options.COPY_TO);
1254    addOption(Options.COPY_FROM);
1255    addOption(Options.TARGET_NAME);
1256    addOption(Options.NO_CHECKSUM_VERIFY);
1257    addOption(Options.NO_TARGET_VERIFY);
1258    addOption(Options.NO_SOURCE_VERIFY);
1259    addOption(Options.OVERWRITE);
1260    addOption(Options.CHUSER);
1261    addOption(Options.CHGROUP);
1262    addOption(Options.CHMOD);
1263    addOption(Options.MAPPERS);
1264    addOption(Options.BANDWIDTH);
1265    addOption(Options.RESET_TTL);
1266  }
1268  public static void main(String[] args) {
1269    new ExportSnapshot().doStaticMain(args);
1270  }