001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.HashSet;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.Set;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import java.util.function.BiConsumer;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileChecksum;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.fs.permission.FsPermission;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.io.FileLink;
051import org.apache.hadoop.hbase.io.HFileLink;
052import org.apache.hadoop.hbase.io.WALLink;
053import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
054import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
055import org.apache.hadoop.hbase.mob.MobUtils;
056import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
057import org.apache.hadoop.hbase.util.AbstractHBaseTool;
058import org.apache.hadoop.hbase.util.CommonFSUtils;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.FSUtils;
061import org.apache.hadoop.hbase.util.HFileArchiveUtil;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.io.BytesWritable;
064import org.apache.hadoop.io.NullWritable;
065import org.apache.hadoop.io.Writable;
066import org.apache.hadoop.mapreduce.InputFormat;
067import org.apache.hadoop.mapreduce.InputSplit;
068import org.apache.hadoop.mapreduce.Job;
069import org.apache.hadoop.mapreduce.JobContext;
070import org.apache.hadoop.mapreduce.Mapper;
071import org.apache.hadoop.mapreduce.RecordReader;
072import org.apache.hadoop.mapreduce.TaskAttemptContext;
073import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
074import org.apache.hadoop.mapreduce.security.TokenCache;
075import org.apache.hadoop.util.StringUtils;
076import org.apache.hadoop.util.Tool;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
082import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
083
084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
088
089/**
090 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
091 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
092 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
093 */
094@InterfaceAudience.Public
095public class ExportSnapshot extends AbstractHBaseTool implements Tool {
096  public static final String NAME = "exportsnapshot";
097  /** Configuration prefix for overrides for the source filesystem */
098  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
099  /** Configuration prefix for overrides for the destination filesystem */
100  public static final String CONF_DEST_PREFIX = NAME + ".to.";
101
102  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
103
104  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
105  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
106  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
107  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
108  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
109  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
110  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
111  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
112  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
113  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
114  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
115  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
116  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
117  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
118  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
119  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
120  private static final String CONF_COPY_MANIFEST_THREADS =
121    "snapshot.export.copy.references.threads";
122  private static final int DEFAULT_COPY_MANIFEST_THREADS =
123    Runtime.getRuntime().availableProcessors();
124
125  static class Testing {
126    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
127    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
128    int failuresCountToInject = 0;
129    int injectedFailureCount = 0;
130  }
131
132  // Command line options and defaults.
133  static final class Options {
134    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
135    static final Option TARGET_NAME =
136      new Option(null, "target", true, "Target name for the snapshot.");
137    static final Option COPY_TO =
138      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
139    static final Option COPY_FROM =
140      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
141    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
142      "Do not verify checksum, use name+length only.");
143    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
144      "Do not verify the exported snapshot's expiration status and integrity.");
145    static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false,
146      "Do not verify the source snapshot's expiration status and integrity.");
147    static final Option OVERWRITE =
148      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
149    static final Option CHUSER =
150      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
151    static final Option CHGROUP =
152      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
153    static final Option CHMOD =
154      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
155    static final Option MAPPERS = new Option(null, "mappers", true,
156      "Number of mappers to use during the copy (mapreduce.job.maps).");
157    static final Option BANDWIDTH =
158      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
159    static final Option RESET_TTL =
160      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
161  }
162
163  // Export Map-Reduce Counters, to keep track of the progress
164  public enum Counter {
165    MISSING_FILES,
166    FILES_COPIED,
167    FILES_SKIPPED,
168    COPY_FAILED,
169    BYTES_EXPECTED,
170    BYTES_SKIPPED,
171    BYTES_COPIED
172  }
173
174  /**
175   * Indicates the checksum comparison result.
176   */
177  public enum ChecksumComparison {
178    TRUE, // checksum comparison is compatible and true.
179    FALSE, // checksum comparison is compatible and false.
180    INCOMPATIBLE, // checksum comparison is not compatible.
181  }
182
183  private static class ExportMapper
184    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
185    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
186    final static int REPORT_SIZE = 1 * 1024 * 1024;
187    final static int BUFFER_SIZE = 64 * 1024;
188
189    private boolean verifyChecksum;
190    private String filesGroup;
191    private String filesUser;
192    private short filesMode;
193    private int bufferSize;
194    private int reportSize;
195
196    private FileSystem outputFs;
197    private Path outputArchive;
198    private Path outputRoot;
199
200    private FileSystem inputFs;
201    private Path inputArchive;
202    private Path inputRoot;
203
204    private static Testing testing = new Testing();
205
206    @Override
207    public void setup(Context context) throws IOException {
208      Configuration conf = context.getConfiguration();
209
210      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
211      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
212
213      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
214
215      filesGroup = conf.get(CONF_FILES_GROUP);
216      filesUser = conf.get(CONF_FILES_USER);
217      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
218      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
219      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
220
221      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
222      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
223
224      try {
225        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
226      } catch (IOException e) {
227        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
228      }
229
230      try {
231        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
232      } catch (IOException e) {
233        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
234      }
235
236      // Use the default block size of the outputFs if bigger
237      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
238      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
239      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
240      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
241
242      for (Counter c : Counter.values()) {
243        context.getCounter(c).increment(0);
244      }
245      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
246        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
247        // Get number of times we have already injected failure based on attempt number of this
248        // task.
249        testing.injectedFailureCount = context.getTaskAttemptID().getId();
250      }
251    }
252
253    @Override
254    public void map(BytesWritable key, NullWritable value, Context context)
255      throws InterruptedException, IOException {
256      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
257      Path outputPath = getOutputPath(inputInfo);
258
259      copyFile(context, inputInfo, outputPath);
260    }
261
262    /**
263     * Returns the location where the inputPath will be copied.
264     */
265    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
266      Path path = null;
267      switch (inputInfo.getType()) {
268        case HFILE:
269          Path inputPath = new Path(inputInfo.getHfile());
270          String family = inputPath.getParent().getName();
271          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
272          String region = HFileLink.getReferencedRegionName(inputPath.getName());
273          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
274          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
275            new Path(region, new Path(family, hfile)));
276          break;
277        case WAL:
278          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
279          break;
280        default:
281          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
282      }
283      return new Path(outputArchive, path);
284    }
285
286    @SuppressWarnings("checkstyle:linelength")
287    /**
288     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
289     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
290     */
291    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
292      throws IOException {
293      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
294      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
295      testing.injectedFailureCount++;
296      context.getCounter(Counter.COPY_FAILED).increment(1);
297      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
298      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
299        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
300    }
301
302    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
303      final Path outputPath) throws IOException {
304      // Get the file information
305      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
306
307      // Verify if the output file exists and is the same that we want to copy
308      if (outputFs.exists(outputPath)) {
309        FileStatus outputStat = outputFs.getFileStatus(outputPath);
310        if (outputStat != null && sameFile(inputStat, outputStat)) {
311          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
312          context.getCounter(Counter.FILES_SKIPPED).increment(1);
313          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
314          return;
315        }
316      }
317
318      InputStream in = openSourceFile(context, inputInfo);
319      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
320      if (Integer.MAX_VALUE != bandwidthMB) {
321        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
322      }
323
324      Path inputPath = inputStat.getPath();
325      try {
326        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
327
328        // Ensure that the output folder is there and copy the file
329        createOutputPath(outputPath.getParent());
330        FSDataOutputStream out = outputFs.create(outputPath, true);
331
332        long stime = EnvironmentEdgeManager.currentTime();
333        long totalBytesWritten =
334          copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
335
336        // Verify the file length and checksum
337        verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath));
338
339        long etime = EnvironmentEdgeManager.currentTime();
340        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
341        LOG
342          .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten)
343            + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String
344              .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
345        context.getCounter(Counter.FILES_COPIED).increment(1);
346
347        // Try to Preserve attributes
348        if (!preserveAttributes(outputPath, inputStat)) {
349          LOG.warn("You may have to run manually chown on: " + outputPath);
350        }
351      } catch (IOException e) {
352        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
353        context.getCounter(Counter.COPY_FAILED).increment(1);
354        throw e;
355      } finally {
356        injectTestFailure(context, inputInfo);
357      }
358    }
359
360    /**
361     * Create the output folder and optionally set ownership.
362     */
363    private void createOutputPath(final Path path) throws IOException {
364      if (filesUser == null && filesGroup == null) {
365        outputFs.mkdirs(path);
366      } else {
367        Path parent = path.getParent();
368        if (!outputFs.exists(parent) && !parent.isRoot()) {
369          createOutputPath(parent);
370        }
371        outputFs.mkdirs(path);
372        if (filesUser != null || filesGroup != null) {
373          // override the owner when non-null user/group is specified
374          outputFs.setOwner(path, filesUser, filesGroup);
375        }
376        if (filesMode > 0) {
377          outputFs.setPermission(path, new FsPermission(filesMode));
378        }
379      }
380    }
381
382    /**
383     * Try to Preserve the files attribute selected by the user copying them from the source file
384     * This is only required when you are exporting as a different user than "hbase" or on a system
385     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
386     * can force a chmod with the user that knows is available on the system.
387     */
388    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
389      FileStatus stat;
390      try {
391        stat = outputFs.getFileStatus(path);
392      } catch (IOException e) {
393        LOG.warn("Unable to get the status for file=" + path);
394        return false;
395      }
396
397      try {
398        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
399          outputFs.setPermission(path, new FsPermission(filesMode));
400        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
401          outputFs.setPermission(path, refStat.getPermission());
402        }
403      } catch (IOException e) {
404        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
405        return false;
406      }
407
408      boolean hasRefStat = (refStat != null);
409      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
410      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
411      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
412        try {
413          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
414            outputFs.setOwner(path, user, group);
415          }
416        } catch (IOException e) {
417          LOG.warn(
418            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
419          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
420            + " group=" + group);
421          return false;
422        }
423      }
424
425      return true;
426    }
427
428    private boolean stringIsNotEmpty(final String str) {
429      return str != null && str.length() > 0;
430    }
431
432    private long copyData(final Context context, final Path inputPath, final InputStream in,
433      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
434      throws IOException {
435      final String statusMessage =
436        "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)";
437
438      try {
439        byte[] buffer = new byte[bufferSize];
440        long totalBytesWritten = 0;
441        int reportBytes = 0;
442        int bytesRead;
443
444        while ((bytesRead = in.read(buffer)) > 0) {
445          out.write(buffer, 0, bytesRead);
446          totalBytesWritten += bytesRead;
447          reportBytes += bytesRead;
448
449          if (reportBytes >= reportSize) {
450            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
451            context.setStatus(
452              String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
453                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
454                + " to " + outputPath);
455            reportBytes = 0;
456          }
457        }
458
459        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
460        context
461          .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
462            (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
463            + outputPath);
464
465        return totalBytesWritten;
466      } finally {
467        out.close();
468        in.close();
469      }
470    }
471
472    /**
473     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
474     * fail or if the file is not found.
475     */
476    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
477      throws IOException {
478      try {
479        Configuration conf = context.getConfiguration();
480        FileLink link = null;
481        switch (fileInfo.getType()) {
482          case HFILE:
483            Path inputPath = new Path(fileInfo.getHfile());
484            link = getFileLink(inputPath, conf);
485            break;
486          case WAL:
487            String serverName = fileInfo.getWalServer();
488            String logName = fileInfo.getWalName();
489            link = new WALLink(inputRoot, serverName, logName);
490            break;
491          default:
492            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
493        }
494        return link.open(inputFs);
495      } catch (IOException e) {
496        context.getCounter(Counter.MISSING_FILES).increment(1);
497        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
498        throw e;
499      }
500    }
501
502    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
503      throws IOException {
504      try {
505        Configuration conf = context.getConfiguration();
506        FileLink link = null;
507        switch (fileInfo.getType()) {
508          case HFILE:
509            Path inputPath = new Path(fileInfo.getHfile());
510            link = getFileLink(inputPath, conf);
511            break;
512          case WAL:
513            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
514            break;
515          default:
516            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
517        }
518        return link.getFileStatus(inputFs);
519      } catch (FileNotFoundException e) {
520        context.getCounter(Counter.MISSING_FILES).increment(1);
521        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
522        throw e;
523      } catch (IOException e) {
524        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
525        throw e;
526      }
527    }
528
529    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
530      String regionName = HFileLink.getReferencedRegionName(path.getName());
531      TableName tableName = HFileLink.getReferencedTableName(path.getName());
532      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
533        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
534          HFileArchiveUtil.getArchivePath(conf), path);
535      }
536      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
537    }
538
539    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
540      try {
541        return fs.getFileChecksum(path);
542      } catch (IOException e) {
543        LOG.warn("Unable to get checksum for file=" + path, e);
544        return null;
545      }
546    }
547
548    /**
549     * Utility to compare the file length and checksums for the paths specified.
550     */
551    private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat)
552      throws IOException {
553      long inputLen = inputStat.getLen();
554      long outputLen = outputStat.getLen();
555      Path inputPath = inputStat.getPath();
556      Path outputPath = outputStat.getPath();
557
558      if (inputLen != outputLen) {
559        throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen
560          + ") and output:" + outputPath + " (" + outputLen + ")");
561      }
562
563      // If length==0, we will skip checksum
564      if (inputLen != 0 && verifyChecksum) {
565        FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
566        FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
567
568        ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum);
569        if (!checksumComparison.equals(ChecksumComparison.TRUE)) {
570          StringBuilder errMessage = new StringBuilder("Checksum mismatch between ")
571            .append(inputPath).append(" and ").append(outputPath).append(".");
572
573          boolean addSkipHint = false;
574          String inputScheme = inputFs.getScheme();
575          String outputScheme = outputFs.getScheme();
576          if (!inputScheme.equals(outputScheme)) {
577            errMessage.append(" Input and output filesystems are of different types.\n")
578              .append("Their checksum algorithms may be incompatible.");
579            addSkipHint = true;
580          } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) {
581            errMessage.append(" Input and output differ in block-size.");
582            addSkipHint = true;
583          } else if (
584            inChecksum != null && outChecksum != null
585              && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
586          ) {
587            errMessage.append(" Input and output checksum algorithms are of different types.");
588            addSkipHint = true;
589          }
590          if (addSkipHint) {
591            errMessage
592              .append(" You can choose file-level checksum validation via "
593                + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
594                + " or filesystems are different.\n")
595              .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,")
596              .append(
597                " for the table backup scenario, you should use -i option to skip checksum-checks.\n")
598              .append(" (NOTE: By skipping checksums, one runs the risk of "
599                + "masking data-corruption during file-transfer.)\n");
600          }
601          throw new IOException(errMessage.toString());
602        }
603      }
604    }
605
606    /**
607     * Utility to compare checksums
608     */
609    private ChecksumComparison verifyChecksum(final FileChecksum inChecksum,
610      final FileChecksum outChecksum) {
611      // If the input or output checksum is null, or the algorithms of input and output are not
612      // equal, that means there is no comparison
613      // and return not compatible. else if matched, return compatible with the matched result.
614      if (
615        inChecksum == null || outChecksum == null
616          || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
617      ) {
618        return ChecksumComparison.INCOMPATIBLE;
619      } else if (inChecksum.equals(outChecksum)) {
620        return ChecksumComparison.TRUE;
621      }
622      return ChecksumComparison.FALSE;
623    }
624
625    /**
626     * Check if the two files are equal by looking at the file length, and at the checksum (if user
627     * has specified the verifyChecksum flag).
628     */
629    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
630      // Not matching length
631      if (inputStat.getLen() != outputStat.getLen()) return false;
632
633      // Mark files as equals, since user asked for no checksum verification
634      if (!verifyChecksum) return true;
635
636      // If checksums are not available, files are not the same.
637      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
638      if (inChecksum == null) return false;
639
640      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
641      if (outChecksum == null) return false;
642
643      return inChecksum.equals(outChecksum);
644    }
645  }
646
647  // ==========================================================================
648  // Input Format
649  // ==========================================================================
650
651  /**
652   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
653   * @return list of files referenced by the snapshot (pair of path and size)
654   */
655  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
656    final FileSystem fs, final Path snapshotDir) throws IOException {
657    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
658
659    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
660    final TableName table = TableName.valueOf(snapshotDesc.getTable());
661
662    // Get snapshot files
663    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
664    Set<String> addedFiles = new HashSet<>();
665    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
666      new SnapshotReferenceUtil.SnapshotVisitor() {
667        @Override
668        public void storeFile(final RegionInfo regionInfo, final String family,
669          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
670          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
671          if (!storeFile.hasReference()) {
672            String region = regionInfo.getEncodedName();
673            String hfile = storeFile.getName();
674            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
675              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
676          } else {
677            Pair<String, String> referredToRegionAndFile =
678              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
679            String referencedRegion = referredToRegionAndFile.getFirst();
680            String referencedHFile = referredToRegionAndFile.getSecond();
681            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
682              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
683          }
684          String fileToExport = snapshotFileAndSize.getFirst().getHfile();
685          if (!addedFiles.contains(fileToExport)) {
686            files.add(snapshotFileAndSize);
687            addedFiles.add(fileToExport);
688          } else {
689            LOG.debug("Skip the existing file: {}.", fileToExport);
690          }
691        }
692      });
693
694    return files;
695  }
696
697  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
698    Configuration conf, TableName table, String region, String family, String hfile, long size)
699    throws IOException {
700    Path path = HFileLink.createPath(table, region, family, hfile);
701    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
702      .setHfile(path.toString()).build();
703    if (size == -1) {
704      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
705    }
706    return new Pair<>(fileInfo, size);
707  }
708
709  /**
710   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
711   * The groups created will have similar amounts of bytes.
712   * <p>
713   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
714   * group fetch the bigger file available, iterating through groups alternating the direction.
715   */
716  static List<List<Pair<SnapshotFileInfo, Long>>>
717    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
718    // Sort files by size, from small to big
719    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
720      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
721        long r = a.getSecond() - b.getSecond();
722        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
723      }
724    });
725
726    // create balanced groups
727    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
728    long[] sizeGroups = new long[ngroups];
729    int hi = files.size() - 1;
730    int lo = 0;
731
732    List<Pair<SnapshotFileInfo, Long>> group;
733    int dir = 1;
734    int g = 0;
735
736    while (hi >= lo) {
737      if (g == fileGroups.size()) {
738        group = new LinkedList<>();
739        fileGroups.add(group);
740      } else {
741        group = fileGroups.get(g);
742      }
743
744      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
745
746      // add the hi one
747      sizeGroups[g] += fileInfo.getSecond();
748      group.add(fileInfo);
749
750      // change direction when at the end or the beginning
751      g += dir;
752      if (g == ngroups) {
753        dir = -1;
754        g = ngroups - 1;
755      } else if (g < 0) {
756        dir = 1;
757        g = 0;
758      }
759    }
760
761    if (LOG.isDebugEnabled()) {
762      for (int i = 0; i < sizeGroups.length; ++i) {
763        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
764      }
765    }
766
767    return fileGroups;
768  }
769
770  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
771    @Override
772    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
773      TaskAttemptContext tac) throws IOException, InterruptedException {
774      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
775    }
776
777    @Override
778    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
779      Configuration conf = context.getConfiguration();
780      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
781      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
782
783      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
784      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
785      if (mappers == 0 && snapshotFiles.size() > 0) {
786        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
787        mappers = Math.min(mappers, snapshotFiles.size());
788        conf.setInt(CONF_NUM_SPLITS, mappers);
789        conf.setInt(MR_NUM_MAPS, mappers);
790      }
791
792      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
793      List<InputSplit> splits = new ArrayList(groups.size());
794      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
795        splits.add(new ExportSnapshotInputSplit(files));
796      }
797      return splits;
798    }
799
800    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
801      private List<Pair<BytesWritable, Long>> files;
802      private long length;
803
804      public ExportSnapshotInputSplit() {
805        this.files = null;
806      }
807
808      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
809        this.files = new ArrayList(snapshotFiles.size());
810        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
811          this.files.add(
812            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
813          this.length += fileInfo.getSecond();
814        }
815      }
816
817      private List<Pair<BytesWritable, Long>> getSplitKeys() {
818        return files;
819      }
820
821      @Override
822      public long getLength() throws IOException, InterruptedException {
823        return length;
824      }
825
826      @Override
827      public String[] getLocations() throws IOException, InterruptedException {
828        return new String[] {};
829      }
830
831      @Override
832      public void readFields(DataInput in) throws IOException {
833        int count = in.readInt();
834        files = new ArrayList<>(count);
835        length = 0;
836        for (int i = 0; i < count; ++i) {
837          BytesWritable fileInfo = new BytesWritable();
838          fileInfo.readFields(in);
839          long size = in.readLong();
840          files.add(new Pair<>(fileInfo, size));
841          length += size;
842        }
843      }
844
845      @Override
846      public void write(DataOutput out) throws IOException {
847        out.writeInt(files.size());
848        for (final Pair<BytesWritable, Long> fileInfo : files) {
849          fileInfo.getFirst().write(out);
850          out.writeLong(fileInfo.getSecond());
851        }
852      }
853    }
854
855    private static class ExportSnapshotRecordReader
856      extends RecordReader<BytesWritable, NullWritable> {
857      private final List<Pair<BytesWritable, Long>> files;
858      private long totalSize = 0;
859      private long procSize = 0;
860      private int index = -1;
861
862      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
863        this.files = files;
864        for (Pair<BytesWritable, Long> fileInfo : files) {
865          totalSize += fileInfo.getSecond();
866        }
867      }
868
869      @Override
870      public void close() {
871      }
872
873      @Override
874      public BytesWritable getCurrentKey() {
875        return files.get(index).getFirst();
876      }
877
878      @Override
879      public NullWritable getCurrentValue() {
880        return NullWritable.get();
881      }
882
883      @Override
884      public float getProgress() {
885        return (float) procSize / totalSize;
886      }
887
888      @Override
889      public void initialize(InputSplit split, TaskAttemptContext tac) {
890      }
891
892      @Override
893      public boolean nextKeyValue() {
894        if (index >= 0) {
895          procSize += files.get(index).getSecond();
896        }
897        return (++index < files.size());
898      }
899    }
900  }
901
902  // ==========================================================================
903  // Tool
904  // ==========================================================================
905
906  /**
907   * Run Map-Reduce Job to perform the files copy.
908   */
909  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
910    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
911    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
912    throws IOException, InterruptedException, ClassNotFoundException {
913    Configuration conf = getConf();
914    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
915    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
916    if (mappers > 0) {
917      conf.setInt(CONF_NUM_SPLITS, mappers);
918      conf.setInt(MR_NUM_MAPS, mappers);
919    }
920    conf.setInt(CONF_FILES_MODE, filesMode);
921    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
922    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
923    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
924    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
925    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
926    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
927
928    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
929    Job job = new Job(conf);
930    job.setJobName(jobname);
931    job.setJarByClass(ExportSnapshot.class);
932    TableMapReduceUtil.addDependencyJars(job);
933    job.setMapperClass(ExportMapper.class);
934    job.setInputFormatClass(ExportSnapshotInputFormat.class);
935    job.setOutputFormatClass(NullOutputFormat.class);
936    job.setMapSpeculativeExecution(false);
937    job.setNumReduceTasks(0);
938
939    // Acquire the delegation Tokens
940    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
941    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
942    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
943    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
944
945    // Run the MR Job
946    if (!job.waitForCompletion(true)) {
947      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
948    }
949  }
950
951  private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf,
952    final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
953    // Update the conf with the current root dir, since may be a different cluster
954    Configuration conf = new Configuration(baseConf);
955    CommonFSUtils.setRootDir(conf, rootDir);
956    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
957    boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(),
958      snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime());
959    if (isExpired) {
960      throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc));
961    }
962    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
963  }
964
965  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
966    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
967    ExecutorService pool = Executors
968      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
969    List<Future<Void>> futures = new ArrayList<>();
970    for (Path dstPath : traversedPath) {
971      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
972      futures.add(future);
973    }
974    try {
975      for (Future<Void> future : futures) {
976        future.get();
977      }
978    } catch (InterruptedException | ExecutionException e) {
979      throw new IOException(e);
980    } finally {
981      pool.shutdownNow();
982    }
983  }
984
985  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
986    Configuration conf, List<Path> traversedPath) throws IOException {
987    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
988      try {
989        fs.setOwner(path, filesUser, filesGroup);
990      } catch (IOException e) {
991        throw new RuntimeException(
992          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
993      }
994    }, conf);
995  }
996
997  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
998    final List<Path> traversedPath, final Configuration conf) throws IOException {
999    if (filesMode <= 0) {
1000      return;
1001    }
1002    FsPermission perm = new FsPermission(filesMode);
1003    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
1004      try {
1005        fs.setPermission(path, perm);
1006      } catch (IOException e) {
1007        throw new RuntimeException(
1008          "set permission for file " + path + " to " + filesMode + " failed", e);
1009      }
1010    }, conf);
1011  }
1012
1013  private boolean verifyTarget = true;
1014  private boolean verifySource = true;
1015  private boolean verifyChecksum = true;
1016  private String snapshotName = null;
1017  private String targetName = null;
1018  private boolean overwrite = false;
1019  private String filesGroup = null;
1020  private String filesUser = null;
1021  private Path outputRoot = null;
1022  private Path inputRoot = null;
1023  private int bandwidthMB = Integer.MAX_VALUE;
1024  private int filesMode = 0;
1025  private int mappers = 0;
1026  private boolean resetTtl = false;
1027
1028  @Override
1029  protected void processOptions(CommandLine cmd) {
1030    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
1031    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
1032    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
1033      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
1034    }
1035    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
1036      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
1037    }
1038    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
1039    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
1040    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
1041    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
1042    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
1043    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
1044    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
1045    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
1046    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
1047    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
1048    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
1049  }
1050
1051  /**
1052   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
1053   * @return 0 on success, and != 0 upon failure.
1054   */
1055  @Override
1056  public int doWork() throws IOException {
1057    Configuration conf = getConf();
1058
1059    // Check user options
1060    if (snapshotName == null) {
1061      System.err.println("Snapshot name not provided.");
1062      LOG.error("Use -h or --help for usage instructions.");
1063      return EXIT_FAILURE;
1064    }
1065
1066    if (outputRoot == null) {
1067      System.err
1068        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
1069      LOG.error("Use -h or --help for usage instructions.");
1070      return EXIT_FAILURE;
1071    }
1072
1073    if (targetName == null) {
1074      targetName = snapshotName;
1075    }
1076    if (inputRoot == null) {
1077      inputRoot = CommonFSUtils.getRootDir(conf);
1078    } else {
1079      CommonFSUtils.setRootDir(conf, inputRoot);
1080    }
1081
1082    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1083    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
1084    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1085    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
1086    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
1087      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
1088    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
1089    Path snapshotTmpDir =
1090      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
1091    Path outputSnapshotDir =
1092      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
1093    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
1094    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
1095    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
1096      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
1097
1098    // throw CorruptedSnapshotException if we can't read the snapshot info.
1099    SnapshotDescription sourceSnapshotDesc =
1100      SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir);
1101
1102    // Verify snapshot source before copying files
1103    if (verifySource) {
1104      LOG.info("Verify the source snapshot's expiration status and integrity.");
1105      verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir);
1106    }
1107
1108    // Find the necessary directory which need to change owner and group
1109    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1110    if (outputFs.exists(needSetOwnerDir)) {
1111      if (skipTmp) {
1112        needSetOwnerDir = outputSnapshotDir;
1113      } else {
1114        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1115        if (outputFs.exists(needSetOwnerDir)) {
1116          needSetOwnerDir = snapshotTmpDir;
1117        }
1118      }
1119    }
1120
1121    // Check if the snapshot already exists
1122    if (outputFs.exists(outputSnapshotDir)) {
1123      if (overwrite) {
1124        if (!outputFs.delete(outputSnapshotDir, true)) {
1125          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1126          return EXIT_FAILURE;
1127        }
1128      } else {
1129        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1130          + outputSnapshotDir);
1131        return EXIT_FAILURE;
1132      }
1133    }
1134
1135    if (!skipTmp) {
1136      // Check if the snapshot already in-progress
1137      if (outputFs.exists(snapshotTmpDir)) {
1138        if (overwrite) {
1139          if (!outputFs.delete(snapshotTmpDir, true)) {
1140            System.err
1141              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1142            return EXIT_FAILURE;
1143          }
1144        } else {
1145          System.err
1146            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1147          System.err
1148            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1149          System.err
1150            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1151          return EXIT_FAILURE;
1152        }
1153      }
1154    }
1155
1156    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1157    // The snapshot references must be copied before the hfiles otherwise the cleaner
1158    // will remove them because they are unreferenced.
1159    List<Path> travesedPaths = new ArrayList<>();
1160    boolean copySucceeded = false;
1161    try {
1162      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1163      travesedPaths =
1164        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1165          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1166      copySucceeded = true;
1167    } catch (IOException e) {
1168      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1169        + " to=" + initialOutputSnapshotDir, e);
1170    } finally {
1171      if (copySucceeded) {
1172        if (filesUser != null || filesGroup != null) {
1173          LOG.warn(
1174            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1175              + (filesGroup == null
1176                ? ""
1177                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1178          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1179        }
1180        if (filesMode > 0) {
1181          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1182          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1183        }
1184      }
1185    }
1186
1187    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1188    // reset TTL for target snapshot.
1189    if (!targetName.equals(snapshotName) || resetTtl) {
1190      SnapshotDescription.Builder snapshotDescBuilder =
1191        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1192      if (!targetName.equals(snapshotName)) {
1193        snapshotDescBuilder.setName(targetName);
1194      }
1195      if (resetTtl) {
1196        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1197      }
1198      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1199        initialOutputSnapshotDir, outputFs);
1200      if (filesUser != null || filesGroup != null) {
1201        outputFs.setOwner(
1202          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1203          filesGroup);
1204      }
1205      if (filesMode > 0) {
1206        outputFs.setPermission(
1207          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1208          new FsPermission((short) filesMode));
1209      }
1210    }
1211
1212    // Step 2 - Start MR Job to copy files
1213    // The snapshot references must be copied before the files otherwise the files gets removed
1214    // by the HFileArchiver, since they have no references.
1215    try {
1216      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1217        filesGroup, filesMode, mappers, bandwidthMB);
1218
1219      LOG.info("Finalize the Snapshot Export");
1220      if (!skipTmp) {
1221        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1222        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1223          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1224            + snapshotTmpDir + " to=" + outputSnapshotDir);
1225        }
1226      }
1227
1228      // Step 4 - Verify snapshot integrity
1229      if (verifyTarget) {
1230        LOG.info("Verify the exported snapshot's expiration status and integrity.");
1231        SnapshotDescription targetSnapshotDesc =
1232          SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir);
1233        verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir);
1234      }
1235
1236      LOG.info("Export Completed: " + targetName);
1237      return EXIT_SUCCESS;
1238    } catch (Exception e) {
1239      LOG.error("Snapshot export failed", e);
1240      if (!skipTmp) {
1241        outputFs.delete(snapshotTmpDir, true);
1242      }
1243      outputFs.delete(outputSnapshotDir, true);
1244      return EXIT_FAILURE;
1245    }
1246  }
1247
1248  @Override
1249  protected void printUsage() {
1250    super.printUsage();
1251    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1252      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1253      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1254      + "  hbase snapshot export \\\n"
1255      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1256      + "    --copy-to hdfs://srv1:50070/hbase");
1257  }
1258
1259  @Override
1260  protected void addOptions() {
1261    addRequiredOption(Options.SNAPSHOT);
1262    addOption(Options.COPY_TO);
1263    addOption(Options.COPY_FROM);
1264    addOption(Options.TARGET_NAME);
1265    addOption(Options.NO_CHECKSUM_VERIFY);
1266    addOption(Options.NO_TARGET_VERIFY);
1267    addOption(Options.NO_SOURCE_VERIFY);
1268    addOption(Options.OVERWRITE);
1269    addOption(Options.CHUSER);
1270    addOption(Options.CHGROUP);
1271    addOption(Options.CHMOD);
1272    addOption(Options.MAPPERS);
1273    addOption(Options.BANDWIDTH);
1274    addOption(Options.RESET_TTL);
1275  }
1276
1277  public static void main(String[] args) {
1278    new ExportSnapshot().doStaticMain(args);
1279  }
1280}