001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.function.BiConsumer;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.fs.FileChecksum;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.io.FileLink;
049import org.apache.hadoop.hbase.io.HFileLink;
050import org.apache.hadoop.hbase.io.WALLink;
051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
053import org.apache.hadoop.hbase.mob.MobUtils;
054import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
055import org.apache.hadoop.hbase.util.AbstractHBaseTool;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.FSUtils;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.mapreduce.InputFormat;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.JobContext;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.RecordReader;
070import org.apache.hadoop.mapreduce.TaskAttemptContext;
071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
072import org.apache.hadoop.mapreduce.security.TokenCache;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.hadoop.util.Tool;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
081
082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
086
087/**
088 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
089 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
090 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
091 */
092@InterfaceAudience.Public
093public class ExportSnapshot extends AbstractHBaseTool implements Tool {
094  public static final String NAME = "exportsnapshot";
095  /** Configuration prefix for overrides for the source filesystem */
096  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
097  /** Configuration prefix for overrides for the destination filesystem */
098  public static final String CONF_DEST_PREFIX = NAME + ".to.";
099
100  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
101
102  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
103  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
104  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
105  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
106  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
107  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
108  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
109  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
110  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
111  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
112  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
113  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
114  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
115  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
116  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
117  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
118  private static final String CONF_COPY_MANIFEST_THREADS =
119    "snapshot.export.copy.references.threads";
120  private static final int DEFAULT_COPY_MANIFEST_THREADS =
121    Runtime.getRuntime().availableProcessors();
122
123  static class Testing {
124    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
125    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
126    int failuresCountToInject = 0;
127    int injectedFailureCount = 0;
128  }
129
130  // Command line options and defaults.
131  static final class Options {
132    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
133    static final Option TARGET_NAME =
134      new Option(null, "target", true, "Target name for the snapshot.");
135    static final Option COPY_TO =
136      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
137    static final Option COPY_FROM =
138      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
139    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
140      "Do not verify checksum, use name+length only.");
141    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
142      "Do not verify the exported snapshot's expiration status and integrity.");
143    static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false,
144      "Do not verify the source snapshot's expiration status and integrity.");
145    static final Option OVERWRITE =
146      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
147    static final Option CHUSER =
148      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
149    static final Option CHGROUP =
150      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
151    static final Option CHMOD =
152      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
153    static final Option MAPPERS = new Option(null, "mappers", true,
154      "Number of mappers to use during the copy (mapreduce.job.maps).");
155    static final Option BANDWIDTH =
156      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
157    static final Option RESET_TTL =
158      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
159  }
160
161  // Export Map-Reduce Counters, to keep track of the progress
162  public enum Counter {
163    MISSING_FILES,
164    FILES_COPIED,
165    FILES_SKIPPED,
166    COPY_FAILED,
167    BYTES_EXPECTED,
168    BYTES_SKIPPED,
169    BYTES_COPIED
170  }
171
172  /**
173   * Indicates the checksum comparison result.
174   */
175  public enum ChecksumComparison {
176    TRUE, // checksum comparison is compatible and true.
177    FALSE, // checksum comparison is compatible and false.
178    INCOMPATIBLE, // checksum comparison is not compatible.
179  }
180
181  private static class ExportMapper
182    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
183    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
184    final static int REPORT_SIZE = 1 * 1024 * 1024;
185    final static int BUFFER_SIZE = 64 * 1024;
186
187    private boolean verifyChecksum;
188    private String filesGroup;
189    private String filesUser;
190    private short filesMode;
191    private int bufferSize;
192    private int reportSize;
193
194    private FileSystem outputFs;
195    private Path outputArchive;
196    private Path outputRoot;
197
198    private FileSystem inputFs;
199    private Path inputArchive;
200    private Path inputRoot;
201
202    private static Testing testing = new Testing();
203
204    @Override
205    public void setup(Context context) throws IOException {
206      Configuration conf = context.getConfiguration();
207
208      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
209      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
210
211      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
212
213      filesGroup = conf.get(CONF_FILES_GROUP);
214      filesUser = conf.get(CONF_FILES_USER);
215      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
216      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
217      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
218
219      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
220      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
221
222      try {
223        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
224      } catch (IOException e) {
225        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
226      }
227
228      try {
229        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
230      } catch (IOException e) {
231        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
232      }
233
234      // Use the default block size of the outputFs if bigger
235      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
236      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
237      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
238      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
239
240      for (Counter c : Counter.values()) {
241        context.getCounter(c).increment(0);
242      }
243      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
244        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
245        // Get number of times we have already injected failure based on attempt number of this
246        // task.
247        testing.injectedFailureCount = context.getTaskAttemptID().getId();
248      }
249    }
250
251    @Override
252    public void map(BytesWritable key, NullWritable value, Context context)
253      throws InterruptedException, IOException {
254      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
255      Path outputPath = getOutputPath(inputInfo);
256
257      copyFile(context, inputInfo, outputPath);
258    }
259
260    /**
261     * Returns the location where the inputPath will be copied.
262     */
263    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
264      Path path = null;
265      switch (inputInfo.getType()) {
266        case HFILE:
267          Path inputPath = new Path(inputInfo.getHfile());
268          String family = inputPath.getParent().getName();
269          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
270          String region = HFileLink.getReferencedRegionName(inputPath.getName());
271          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
272          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
273            new Path(region, new Path(family, hfile)));
274          break;
275        case WAL:
276          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
277          break;
278        default:
279          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
280      }
281      return new Path(outputArchive, path);
282    }
283
284    @SuppressWarnings("checkstyle:linelength")
285    /**
286     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
287     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
288     */
289    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
290      throws IOException {
291      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
292      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
293      testing.injectedFailureCount++;
294      context.getCounter(Counter.COPY_FAILED).increment(1);
295      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
296      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
297        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
298    }
299
300    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
301      final Path outputPath) throws IOException {
302      // Get the file information
303      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
304
305      // Verify if the output file exists and is the same that we want to copy
306      if (outputFs.exists(outputPath)) {
307        FileStatus outputStat = outputFs.getFileStatus(outputPath);
308        if (outputStat != null && sameFile(inputStat, outputStat)) {
309          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
310          context.getCounter(Counter.FILES_SKIPPED).increment(1);
311          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
312          return;
313        }
314      }
315
316      InputStream in = openSourceFile(context, inputInfo);
317      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
318      if (Integer.MAX_VALUE != bandwidthMB) {
319        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
320      }
321
322      Path inputPath = inputStat.getPath();
323      try {
324        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
325
326        // Ensure that the output folder is there and copy the file
327        createOutputPath(outputPath.getParent());
328        FSDataOutputStream out = outputFs.create(outputPath, true);
329
330        long stime = EnvironmentEdgeManager.currentTime();
331        long totalBytesWritten =
332          copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
333
334        // Verify the file length and checksum
335        verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath));
336
337        long etime = EnvironmentEdgeManager.currentTime();
338        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
339        LOG
340          .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten)
341            + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String
342              .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
343        context.getCounter(Counter.FILES_COPIED).increment(1);
344
345        // Try to Preserve attributes
346        if (!preserveAttributes(outputPath, inputStat)) {
347          LOG.warn("You may have to run manually chown on: " + outputPath);
348        }
349      } catch (IOException e) {
350        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
351        context.getCounter(Counter.COPY_FAILED).increment(1);
352        throw e;
353      } finally {
354        injectTestFailure(context, inputInfo);
355      }
356    }
357
358    /**
359     * Create the output folder and optionally set ownership.
360     */
361    private void createOutputPath(final Path path) throws IOException {
362      if (filesUser == null && filesGroup == null) {
363        outputFs.mkdirs(path);
364      } else {
365        Path parent = path.getParent();
366        if (!outputFs.exists(parent) && !parent.isRoot()) {
367          createOutputPath(parent);
368        }
369        outputFs.mkdirs(path);
370        if (filesUser != null || filesGroup != null) {
371          // override the owner when non-null user/group is specified
372          outputFs.setOwner(path, filesUser, filesGroup);
373        }
374        if (filesMode > 0) {
375          outputFs.setPermission(path, new FsPermission(filesMode));
376        }
377      }
378    }
379
380    /**
381     * Try to Preserve the files attribute selected by the user copying them from the source file
382     * This is only required when you are exporting as a different user than "hbase" or on a system
383     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
384     * can force a chmod with the user that knows is available on the system.
385     */
386    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
387      FileStatus stat;
388      try {
389        stat = outputFs.getFileStatus(path);
390      } catch (IOException e) {
391        LOG.warn("Unable to get the status for file=" + path);
392        return false;
393      }
394
395      try {
396        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
397          outputFs.setPermission(path, new FsPermission(filesMode));
398        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
399          outputFs.setPermission(path, refStat.getPermission());
400        }
401      } catch (IOException e) {
402        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
403        return false;
404      }
405
406      boolean hasRefStat = (refStat != null);
407      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
408      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
409      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
410        try {
411          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
412            outputFs.setOwner(path, user, group);
413          }
414        } catch (IOException e) {
415          LOG.warn(
416            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
417          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
418            + " group=" + group);
419          return false;
420        }
421      }
422
423      return true;
424    }
425
426    private boolean stringIsNotEmpty(final String str) {
427      return str != null && str.length() > 0;
428    }
429
430    private long copyData(final Context context, final Path inputPath, final InputStream in,
431      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
432      throws IOException {
433      final String statusMessage =
434        "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)";
435
436      try {
437        byte[] buffer = new byte[bufferSize];
438        long totalBytesWritten = 0;
439        int reportBytes = 0;
440        int bytesRead;
441
442        while ((bytesRead = in.read(buffer)) > 0) {
443          out.write(buffer, 0, bytesRead);
444          totalBytesWritten += bytesRead;
445          reportBytes += bytesRead;
446
447          if (reportBytes >= reportSize) {
448            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
449            context.setStatus(
450              String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
451                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
452                + " to " + outputPath);
453            reportBytes = 0;
454          }
455        }
456
457        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
458        context
459          .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
460            (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
461            + outputPath);
462
463        return totalBytesWritten;
464      } finally {
465        out.close();
466        in.close();
467      }
468    }
469
470    /**
471     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
472     * fail or if the file is not found.
473     */
474    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
475      throws IOException {
476      try {
477        Configuration conf = context.getConfiguration();
478        FileLink link = null;
479        switch (fileInfo.getType()) {
480          case HFILE:
481            Path inputPath = new Path(fileInfo.getHfile());
482            link = getFileLink(inputPath, conf);
483            break;
484          case WAL:
485            String serverName = fileInfo.getWalServer();
486            String logName = fileInfo.getWalName();
487            link = new WALLink(inputRoot, serverName, logName);
488            break;
489          default:
490            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
491        }
492        return link.open(inputFs);
493      } catch (IOException e) {
494        context.getCounter(Counter.MISSING_FILES).increment(1);
495        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
496        throw e;
497      }
498    }
499
500    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
501      throws IOException {
502      try {
503        Configuration conf = context.getConfiguration();
504        FileLink link = null;
505        switch (fileInfo.getType()) {
506          case HFILE:
507            Path inputPath = new Path(fileInfo.getHfile());
508            link = getFileLink(inputPath, conf);
509            break;
510          case WAL:
511            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
512            break;
513          default:
514            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
515        }
516        return link.getFileStatus(inputFs);
517      } catch (FileNotFoundException e) {
518        context.getCounter(Counter.MISSING_FILES).increment(1);
519        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
520        throw e;
521      } catch (IOException e) {
522        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
523        throw e;
524      }
525    }
526
527    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
528      String regionName = HFileLink.getReferencedRegionName(path.getName());
529      TableName tableName = HFileLink.getReferencedTableName(path.getName());
530      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
531        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
532          HFileArchiveUtil.getArchivePath(conf), path);
533      }
534      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
535    }
536
537    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
538      try {
539        return fs.getFileChecksum(path);
540      } catch (IOException e) {
541        LOG.warn("Unable to get checksum for file=" + path, e);
542        return null;
543      }
544    }
545
546    /**
547     * Utility to compare the file length and checksums for the paths specified.
548     */
549    private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat)
550      throws IOException {
551      long inputLen = inputStat.getLen();
552      long outputLen = outputStat.getLen();
553      Path inputPath = inputStat.getPath();
554      Path outputPath = outputStat.getPath();
555
556      if (inputLen != outputLen) {
557        throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen
558          + ") and output:" + outputPath + " (" + outputLen + ")");
559      }
560
561      // If length==0, we will skip checksum
562      if (inputLen != 0 && verifyChecksum) {
563        FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
564        FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
565
566        ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum);
567        if (!checksumComparison.equals(ChecksumComparison.TRUE)) {
568          StringBuilder errMessage = new StringBuilder("Checksum mismatch between ")
569            .append(inputPath).append(" and ").append(outputPath).append(".");
570
571          boolean addSkipHint = false;
572          String inputScheme = inputFs.getScheme();
573          String outputScheme = outputFs.getScheme();
574          if (!inputScheme.equals(outputScheme)) {
575            errMessage.append(" Input and output filesystems are of different types.\n")
576              .append("Their checksum algorithms may be incompatible.");
577            addSkipHint = true;
578          } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) {
579            errMessage.append(" Input and output differ in block-size.");
580            addSkipHint = true;
581          } else if (
582            inChecksum != null && outChecksum != null
583              && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
584          ) {
585            errMessage.append(" Input and output checksum algorithms are of different types.");
586            addSkipHint = true;
587          }
588          if (addSkipHint) {
589            errMessage
590              .append(" You can choose file-level checksum validation via "
591                + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
592                + " or filesystems are different.\n")
593              .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,")
594              .append(
595                " for the table backup scenario, you should use -i option to skip checksum-checks.\n")
596              .append(" (NOTE: By skipping checksums, one runs the risk of "
597                + "masking data-corruption during file-transfer.)\n");
598          }
599          throw new IOException(errMessage.toString());
600        }
601      }
602    }
603
604    /**
605     * Utility to compare checksums
606     */
607    private ChecksumComparison verifyChecksum(final FileChecksum inChecksum,
608      final FileChecksum outChecksum) {
609      // If the input or output checksum is null, or the algorithms of input and output are not
610      // equal, that means there is no comparison
611      // and return not compatible. else if matched, return compatible with the matched result.
612      if (
613        inChecksum == null || outChecksum == null
614          || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
615      ) {
616        return ChecksumComparison.INCOMPATIBLE;
617      } else if (inChecksum.equals(outChecksum)) {
618        return ChecksumComparison.TRUE;
619      }
620      return ChecksumComparison.FALSE;
621    }
622
623    /**
624     * Check if the two files are equal by looking at the file length, and at the checksum (if user
625     * has specified the verifyChecksum flag).
626     */
627    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
628      // Not matching length
629      if (inputStat.getLen() != outputStat.getLen()) return false;
630
631      // Mark files as equals, since user asked for no checksum verification
632      if (!verifyChecksum) return true;
633
634      // If checksums are not available, files are not the same.
635      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
636      if (inChecksum == null) return false;
637
638      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
639      if (outChecksum == null) return false;
640
641      return inChecksum.equals(outChecksum);
642    }
643  }
644
645  // ==========================================================================
646  // Input Format
647  // ==========================================================================
648
649  /**
650   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
651   * @return list of files referenced by the snapshot (pair of path and size)
652   */
653  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
654    final FileSystem fs, final Path snapshotDir) throws IOException {
655    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
656
657    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
658    final TableName table = TableName.valueOf(snapshotDesc.getTable());
659
660    // Get snapshot files
661    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
662    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
663      new SnapshotReferenceUtil.SnapshotVisitor() {
664        @Override
665        public void storeFile(final RegionInfo regionInfo, final String family,
666          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
667          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
668          if (!storeFile.hasReference()) {
669            String region = regionInfo.getEncodedName();
670            String hfile = storeFile.getName();
671            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
672              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
673          } else {
674            Pair<String, String> referredToRegionAndFile =
675              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
676            String referencedRegion = referredToRegionAndFile.getFirst();
677            String referencedHFile = referredToRegionAndFile.getSecond();
678            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
679              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
680          }
681          files.add(snapshotFileAndSize);
682        }
683      });
684
685    return files;
686  }
687
688  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
689    Configuration conf, TableName table, String region, String family, String hfile, long size)
690    throws IOException {
691    Path path = HFileLink.createPath(table, region, family, hfile);
692    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
693      .setHfile(path.toString()).build();
694    if (size == -1) {
695      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
696    }
697    return new Pair<>(fileInfo, size);
698  }
699
700  /**
701   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
702   * The groups created will have similar amounts of bytes.
703   * <p>
704   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
705   * group fetch the bigger file available, iterating through groups alternating the direction.
706   */
707  static List<List<Pair<SnapshotFileInfo, Long>>>
708    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
709    // Sort files by size, from small to big
710    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
711      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
712        long r = a.getSecond() - b.getSecond();
713        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
714      }
715    });
716
717    // create balanced groups
718    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
719    long[] sizeGroups = new long[ngroups];
720    int hi = files.size() - 1;
721    int lo = 0;
722
723    List<Pair<SnapshotFileInfo, Long>> group;
724    int dir = 1;
725    int g = 0;
726
727    while (hi >= lo) {
728      if (g == fileGroups.size()) {
729        group = new LinkedList<>();
730        fileGroups.add(group);
731      } else {
732        group = fileGroups.get(g);
733      }
734
735      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
736
737      // add the hi one
738      sizeGroups[g] += fileInfo.getSecond();
739      group.add(fileInfo);
740
741      // change direction when at the end or the beginning
742      g += dir;
743      if (g == ngroups) {
744        dir = -1;
745        g = ngroups - 1;
746      } else if (g < 0) {
747        dir = 1;
748        g = 0;
749      }
750    }
751
752    if (LOG.isDebugEnabled()) {
753      for (int i = 0; i < sizeGroups.length; ++i) {
754        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
755      }
756    }
757
758    return fileGroups;
759  }
760
761  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
762    @Override
763    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
764      TaskAttemptContext tac) throws IOException, InterruptedException {
765      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
766    }
767
768    @Override
769    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
770      Configuration conf = context.getConfiguration();
771      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
772      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
773
774      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
775      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
776      if (mappers == 0 && snapshotFiles.size() > 0) {
777        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
778        mappers = Math.min(mappers, snapshotFiles.size());
779        conf.setInt(CONF_NUM_SPLITS, mappers);
780        conf.setInt(MR_NUM_MAPS, mappers);
781      }
782
783      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
784      List<InputSplit> splits = new ArrayList(groups.size());
785      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
786        splits.add(new ExportSnapshotInputSplit(files));
787      }
788      return splits;
789    }
790
791    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
792      private List<Pair<BytesWritable, Long>> files;
793      private long length;
794
795      public ExportSnapshotInputSplit() {
796        this.files = null;
797      }
798
799      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
800        this.files = new ArrayList(snapshotFiles.size());
801        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
802          this.files.add(
803            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
804          this.length += fileInfo.getSecond();
805        }
806      }
807
808      private List<Pair<BytesWritable, Long>> getSplitKeys() {
809        return files;
810      }
811
812      @Override
813      public long getLength() throws IOException, InterruptedException {
814        return length;
815      }
816
817      @Override
818      public String[] getLocations() throws IOException, InterruptedException {
819        return new String[] {};
820      }
821
822      @Override
823      public void readFields(DataInput in) throws IOException {
824        int count = in.readInt();
825        files = new ArrayList<>(count);
826        length = 0;
827        for (int i = 0; i < count; ++i) {
828          BytesWritable fileInfo = new BytesWritable();
829          fileInfo.readFields(in);
830          long size = in.readLong();
831          files.add(new Pair<>(fileInfo, size));
832          length += size;
833        }
834      }
835
836      @Override
837      public void write(DataOutput out) throws IOException {
838        out.writeInt(files.size());
839        for (final Pair<BytesWritable, Long> fileInfo : files) {
840          fileInfo.getFirst().write(out);
841          out.writeLong(fileInfo.getSecond());
842        }
843      }
844    }
845
846    private static class ExportSnapshotRecordReader
847      extends RecordReader<BytesWritable, NullWritable> {
848      private final List<Pair<BytesWritable, Long>> files;
849      private long totalSize = 0;
850      private long procSize = 0;
851      private int index = -1;
852
853      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
854        this.files = files;
855        for (Pair<BytesWritable, Long> fileInfo : files) {
856          totalSize += fileInfo.getSecond();
857        }
858      }
859
860      @Override
861      public void close() {
862      }
863
864      @Override
865      public BytesWritable getCurrentKey() {
866        return files.get(index).getFirst();
867      }
868
869      @Override
870      public NullWritable getCurrentValue() {
871        return NullWritable.get();
872      }
873
874      @Override
875      public float getProgress() {
876        return (float) procSize / totalSize;
877      }
878
879      @Override
880      public void initialize(InputSplit split, TaskAttemptContext tac) {
881      }
882
883      @Override
884      public boolean nextKeyValue() {
885        if (index >= 0) {
886          procSize += files.get(index).getSecond();
887        }
888        return (++index < files.size());
889      }
890    }
891  }
892
893  // ==========================================================================
894  // Tool
895  // ==========================================================================
896
897  /**
898   * Run Map-Reduce Job to perform the files copy.
899   */
900  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
901    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
902    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
903    throws IOException, InterruptedException, ClassNotFoundException {
904    Configuration conf = getConf();
905    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
906    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
907    if (mappers > 0) {
908      conf.setInt(CONF_NUM_SPLITS, mappers);
909      conf.setInt(MR_NUM_MAPS, mappers);
910    }
911    conf.setInt(CONF_FILES_MODE, filesMode);
912    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
913    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
914    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
915    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
916    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
917    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
918
919    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
920    Job job = new Job(conf);
921    job.setJobName(jobname);
922    job.setJarByClass(ExportSnapshot.class);
923    TableMapReduceUtil.addDependencyJars(job);
924    job.setMapperClass(ExportMapper.class);
925    job.setInputFormatClass(ExportSnapshotInputFormat.class);
926    job.setOutputFormatClass(NullOutputFormat.class);
927    job.setMapSpeculativeExecution(false);
928    job.setNumReduceTasks(0);
929
930    // Acquire the delegation Tokens
931    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
932    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
933    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
934    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
935
936    // Run the MR Job
937    if (!job.waitForCompletion(true)) {
938      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
939    }
940  }
941
942  private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf,
943    final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
944    // Update the conf with the current root dir, since may be a different cluster
945    Configuration conf = new Configuration(baseConf);
946    CommonFSUtils.setRootDir(conf, rootDir);
947    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
948    boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(),
949      snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime());
950    if (isExpired) {
951      throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc));
952    }
953    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
954  }
955
956  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
957    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
958    ExecutorService pool = Executors
959      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
960    List<Future<Void>> futures = new ArrayList<>();
961    for (Path dstPath : traversedPath) {
962      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
963      futures.add(future);
964    }
965    try {
966      for (Future<Void> future : futures) {
967        future.get();
968      }
969    } catch (InterruptedException | ExecutionException e) {
970      throw new IOException(e);
971    } finally {
972      pool.shutdownNow();
973    }
974  }
975
976  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
977    Configuration conf, List<Path> traversedPath) throws IOException {
978    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
979      try {
980        fs.setOwner(path, filesUser, filesGroup);
981      } catch (IOException e) {
982        throw new RuntimeException(
983          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
984      }
985    }, conf);
986  }
987
988  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
989    final List<Path> traversedPath, final Configuration conf) throws IOException {
990    if (filesMode <= 0) {
991      return;
992    }
993    FsPermission perm = new FsPermission(filesMode);
994    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
995      try {
996        fs.setPermission(path, perm);
997      } catch (IOException e) {
998        throw new RuntimeException(
999          "set permission for file " + path + " to " + filesMode + " failed", e);
1000      }
1001    }, conf);
1002  }
1003
1004  private boolean verifyTarget = true;
1005  private boolean verifySource = true;
1006  private boolean verifyChecksum = true;
1007  private String snapshotName = null;
1008  private String targetName = null;
1009  private boolean overwrite = false;
1010  private String filesGroup = null;
1011  private String filesUser = null;
1012  private Path outputRoot = null;
1013  private Path inputRoot = null;
1014  private int bandwidthMB = Integer.MAX_VALUE;
1015  private int filesMode = 0;
1016  private int mappers = 0;
1017  private boolean resetTtl = false;
1018
1019  @Override
1020  protected void processOptions(CommandLine cmd) {
1021    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
1022    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
1023    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
1024      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
1025    }
1026    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
1027      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
1028    }
1029    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
1030    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
1031    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
1032    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
1033    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
1034    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
1035    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
1036    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
1037    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
1038    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
1039    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
1040  }
1041
1042  /**
1043   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
1044   * @return 0 on success, and != 0 upon failure.
1045   */
1046  @Override
1047  public int doWork() throws IOException {
1048    Configuration conf = getConf();
1049
1050    // Check user options
1051    if (snapshotName == null) {
1052      System.err.println("Snapshot name not provided.");
1053      LOG.error("Use -h or --help for usage instructions.");
1054      return EXIT_FAILURE;
1055    }
1056
1057    if (outputRoot == null) {
1058      System.err
1059        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
1060      LOG.error("Use -h or --help for usage instructions.");
1061      return EXIT_FAILURE;
1062    }
1063
1064    if (targetName == null) {
1065      targetName = snapshotName;
1066    }
1067    if (inputRoot == null) {
1068      inputRoot = CommonFSUtils.getRootDir(conf);
1069    } else {
1070      CommonFSUtils.setRootDir(conf, inputRoot);
1071    }
1072
1073    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1074    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
1075    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1076    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
1077    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
1078      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
1079    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
1080    Path snapshotTmpDir =
1081      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
1082    Path outputSnapshotDir =
1083      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
1084    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
1085    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
1086    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
1087      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
1088
1089    // throw CorruptedSnapshotException if we can't read the snapshot info.
1090    SnapshotDescription sourceSnapshotDesc =
1091      SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir);
1092
1093    // Verify snapshot source before copying files
1094    if (verifySource) {
1095      LOG.info("Verify the source snapshot's expiration status and integrity.");
1096      verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir);
1097    }
1098
1099    // Find the necessary directory which need to change owner and group
1100    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1101    if (outputFs.exists(needSetOwnerDir)) {
1102      if (skipTmp) {
1103        needSetOwnerDir = outputSnapshotDir;
1104      } else {
1105        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1106        if (outputFs.exists(needSetOwnerDir)) {
1107          needSetOwnerDir = snapshotTmpDir;
1108        }
1109      }
1110    }
1111
1112    // Check if the snapshot already exists
1113    if (outputFs.exists(outputSnapshotDir)) {
1114      if (overwrite) {
1115        if (!outputFs.delete(outputSnapshotDir, true)) {
1116          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1117          return EXIT_FAILURE;
1118        }
1119      } else {
1120        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1121          + outputSnapshotDir);
1122        return EXIT_FAILURE;
1123      }
1124    }
1125
1126    if (!skipTmp) {
1127      // Check if the snapshot already in-progress
1128      if (outputFs.exists(snapshotTmpDir)) {
1129        if (overwrite) {
1130          if (!outputFs.delete(snapshotTmpDir, true)) {
1131            System.err
1132              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1133            return EXIT_FAILURE;
1134          }
1135        } else {
1136          System.err
1137            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1138          System.err
1139            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1140          System.err
1141            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1142          return EXIT_FAILURE;
1143        }
1144      }
1145    }
1146
1147    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1148    // The snapshot references must be copied before the hfiles otherwise the cleaner
1149    // will remove them because they are unreferenced.
1150    List<Path> travesedPaths = new ArrayList<>();
1151    boolean copySucceeded = false;
1152    try {
1153      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1154      travesedPaths =
1155        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1156          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1157      copySucceeded = true;
1158    } catch (IOException e) {
1159      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1160        + " to=" + initialOutputSnapshotDir, e);
1161    } finally {
1162      if (copySucceeded) {
1163        if (filesUser != null || filesGroup != null) {
1164          LOG.warn(
1165            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1166              + (filesGroup == null
1167                ? ""
1168                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1169          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1170        }
1171        if (filesMode > 0) {
1172          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1173          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1174        }
1175      }
1176    }
1177
1178    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1179    // reset TTL for target snapshot.
1180    if (!targetName.equals(snapshotName) || resetTtl) {
1181      SnapshotDescription.Builder snapshotDescBuilder =
1182        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1183      if (!targetName.equals(snapshotName)) {
1184        snapshotDescBuilder.setName(targetName);
1185      }
1186      if (resetTtl) {
1187        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1188      }
1189      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1190        initialOutputSnapshotDir, outputFs);
1191      if (filesUser != null || filesGroup != null) {
1192        outputFs.setOwner(
1193          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1194          filesGroup);
1195      }
1196      if (filesMode > 0) {
1197        outputFs.setPermission(
1198          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1199          new FsPermission((short) filesMode));
1200      }
1201    }
1202
1203    // Step 2 - Start MR Job to copy files
1204    // The snapshot references must be copied before the files otherwise the files gets removed
1205    // by the HFileArchiver, since they have no references.
1206    try {
1207      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1208        filesGroup, filesMode, mappers, bandwidthMB);
1209
1210      LOG.info("Finalize the Snapshot Export");
1211      if (!skipTmp) {
1212        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1213        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1214          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1215            + snapshotTmpDir + " to=" + outputSnapshotDir);
1216        }
1217      }
1218
1219      // Step 4 - Verify snapshot integrity
1220      if (verifyTarget) {
1221        LOG.info("Verify the exported snapshot's expiration status and integrity.");
1222        SnapshotDescription targetSnapshotDesc =
1223          SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir);
1224        verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir);
1225      }
1226
1227      LOG.info("Export Completed: " + targetName);
1228      return EXIT_SUCCESS;
1229    } catch (Exception e) {
1230      LOG.error("Snapshot export failed", e);
1231      if (!skipTmp) {
1232        outputFs.delete(snapshotTmpDir, true);
1233      }
1234      outputFs.delete(outputSnapshotDir, true);
1235      return EXIT_FAILURE;
1236    }
1237  }
1238
1239  @Override
1240  protected void printUsage() {
1241    super.printUsage();
1242    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1243      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1244      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1245      + "  hbase snapshot export \\\n"
1246      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1247      + "    --copy-to hdfs://srv1:50070/hbase");
1248  }
1249
1250  @Override
1251  protected void addOptions() {
1252    addRequiredOption(Options.SNAPSHOT);
1253    addOption(Options.COPY_TO);
1254    addOption(Options.COPY_FROM);
1255    addOption(Options.TARGET_NAME);
1256    addOption(Options.NO_CHECKSUM_VERIFY);
1257    addOption(Options.NO_TARGET_VERIFY);
1258    addOption(Options.NO_SOURCE_VERIFY);
1259    addOption(Options.OVERWRITE);
1260    addOption(Options.CHUSER);
1261    addOption(Options.CHGROUP);
1262    addOption(Options.CHMOD);
1263    addOption(Options.MAPPERS);
1264    addOption(Options.BANDWIDTH);
1265    addOption(Options.RESET_TTL);
1266  }
1267
1268  public static void main(String[] args) {
1269    new ExportSnapshot().doStaticMain(args);
1270  }
1271}