001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.function.BiConsumer;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.fs.FileChecksum;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.io.FileLink;
049import org.apache.hadoop.hbase.io.HFileLink;
050import org.apache.hadoop.hbase.io.WALLink;
051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
053import org.apache.hadoop.hbase.mob.MobUtils;
054import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
055import org.apache.hadoop.hbase.util.AbstractHBaseTool;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.FSUtils;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.mapreduce.InputFormat;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.JobContext;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.RecordReader;
070import org.apache.hadoop.mapreduce.TaskAttemptContext;
071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
072import org.apache.hadoop.mapreduce.security.TokenCache;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.hadoop.util.Tool;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
081
082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
085
086/**
087 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
088 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
089 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
090 */
091@InterfaceAudience.Public
092public class ExportSnapshot extends AbstractHBaseTool implements Tool {
093  public static final String NAME = "exportsnapshot";
094  /** Configuration prefix for overrides for the source filesystem */
095  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
096  /** Configuration prefix for overrides for the destination filesystem */
097  public static final String CONF_DEST_PREFIX = NAME + ".to.";
098
099  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
100
101  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
102  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
103  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
104  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
105  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
106  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
107  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
108  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
109  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
110  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
111  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
112  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
113  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
114  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
115  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
116  private static final String CONF_COPY_MANIFEST_THREADS =
117    "snapshot.export.copy.references.threads";
118  private static final int DEFAULT_COPY_MANIFEST_THREADS =
119    Runtime.getRuntime().availableProcessors();
120
121  static class Testing {
122    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
123    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
124    int failuresCountToInject = 0;
125    int injectedFailureCount = 0;
126  }
127
128  // Command line options and defaults.
129  static final class Options {
130    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
131    static final Option TARGET_NAME =
132      new Option(null, "target", true, "Target name for the snapshot.");
133    static final Option COPY_TO =
134      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
135    static final Option COPY_FROM =
136      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
137    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
138      "Do not verify checksum, use name+length only.");
139    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
140      "Do not verify the integrity of the exported snapshot.");
141    static final Option NO_SOURCE_VERIFY =
142      new Option(null, "no-source-verify", false, "Do not verify the source of the snapshot.");
143    static final Option OVERWRITE =
144      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
145    static final Option CHUSER =
146      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
147    static final Option CHGROUP =
148      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
149    static final Option CHMOD =
150      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
151    static final Option MAPPERS = new Option(null, "mappers", true,
152      "Number of mappers to use during the copy (mapreduce.job.maps).");
153    static final Option BANDWIDTH =
154      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
155    static final Option RESET_TTL =
156      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
157  }
158
159  // Export Map-Reduce Counters, to keep track of the progress
160  public enum Counter {
161    MISSING_FILES,
162    FILES_COPIED,
163    FILES_SKIPPED,
164    COPY_FAILED,
165    BYTES_EXPECTED,
166    BYTES_SKIPPED,
167    BYTES_COPIED
168  }
169
170  private static class ExportMapper
171    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
172    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
173    final static int REPORT_SIZE = 1 * 1024 * 1024;
174    final static int BUFFER_SIZE = 64 * 1024;
175
176    private boolean verifyChecksum;
177    private String filesGroup;
178    private String filesUser;
179    private short filesMode;
180    private int bufferSize;
181
182    private FileSystem outputFs;
183    private Path outputArchive;
184    private Path outputRoot;
185
186    private FileSystem inputFs;
187    private Path inputArchive;
188    private Path inputRoot;
189
190    private static Testing testing = new Testing();
191
192    @Override
193    public void setup(Context context) throws IOException {
194      Configuration conf = context.getConfiguration();
195
196      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
197      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
198
199      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
200
201      filesGroup = conf.get(CONF_FILES_GROUP);
202      filesUser = conf.get(CONF_FILES_USER);
203      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
204      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
205      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
206
207      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
208      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
209
210      try {
211        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
212      } catch (IOException e) {
213        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
214      }
215
216      try {
217        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
218      } catch (IOException e) {
219        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
220      }
221
222      // Use the default block size of the outputFs if bigger
223      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
224      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
225      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
226
227      for (Counter c : Counter.values()) {
228        context.getCounter(c).increment(0);
229      }
230      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
231        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
232        // Get number of times we have already injected failure based on attempt number of this
233        // task.
234        testing.injectedFailureCount = context.getTaskAttemptID().getId();
235      }
236    }
237
238    @Override
239    public void map(BytesWritable key, NullWritable value, Context context)
240      throws InterruptedException, IOException {
241      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
242      Path outputPath = getOutputPath(inputInfo);
243
244      copyFile(context, inputInfo, outputPath);
245    }
246
247    /**
248     * Returns the location where the inputPath will be copied.
249     */
250    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
251      Path path = null;
252      switch (inputInfo.getType()) {
253        case HFILE:
254          Path inputPath = new Path(inputInfo.getHfile());
255          String family = inputPath.getParent().getName();
256          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
257          String region = HFileLink.getReferencedRegionName(inputPath.getName());
258          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
259          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
260            new Path(region, new Path(family, hfile)));
261          break;
262        case WAL:
263          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
264          break;
265        default:
266          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
267      }
268      return new Path(outputArchive, path);
269    }
270
271    @SuppressWarnings("checkstyle:linelength")
272    /**
273     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
274     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
275     */
276    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
277      throws IOException {
278      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
279      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
280      testing.injectedFailureCount++;
281      context.getCounter(Counter.COPY_FAILED).increment(1);
282      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
283      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
284        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
285    }
286
287    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
288      final Path outputPath) throws IOException {
289      // Get the file information
290      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
291
292      // Verify if the output file exists and is the same that we want to copy
293      if (outputFs.exists(outputPath)) {
294        FileStatus outputStat = outputFs.getFileStatus(outputPath);
295        if (outputStat != null && sameFile(inputStat, outputStat)) {
296          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
297          context.getCounter(Counter.FILES_SKIPPED).increment(1);
298          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
299          return;
300        }
301      }
302
303      InputStream in = openSourceFile(context, inputInfo);
304      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
305      if (Integer.MAX_VALUE != bandwidthMB) {
306        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
307      }
308
309      try {
310        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
311
312        // Ensure that the output folder is there and copy the file
313        createOutputPath(outputPath.getParent());
314        FSDataOutputStream out = outputFs.create(outputPath, true);
315        try {
316          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
317        } finally {
318          out.close();
319        }
320
321        // Try to Preserve attributes
322        if (!preserveAttributes(outputPath, inputStat)) {
323          LOG.warn("You may have to run manually chown on: " + outputPath);
324        }
325      } finally {
326        in.close();
327        injectTestFailure(context, inputInfo);
328      }
329    }
330
331    /**
332     * Create the output folder and optionally set ownership.
333     */
334    private void createOutputPath(final Path path) throws IOException {
335      if (filesUser == null && filesGroup == null) {
336        outputFs.mkdirs(path);
337      } else {
338        Path parent = path.getParent();
339        if (!outputFs.exists(parent) && !parent.isRoot()) {
340          createOutputPath(parent);
341        }
342        outputFs.mkdirs(path);
343        if (filesUser != null || filesGroup != null) {
344          // override the owner when non-null user/group is specified
345          outputFs.setOwner(path, filesUser, filesGroup);
346        }
347        if (filesMode > 0) {
348          outputFs.setPermission(path, new FsPermission(filesMode));
349        }
350      }
351    }
352
353    /**
354     * Try to Preserve the files attribute selected by the user copying them from the source file
355     * This is only required when you are exporting as a different user than "hbase" or on a system
356     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
357     * can force a chmod with the user that knows is available on the system.
358     */
359    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
360      FileStatus stat;
361      try {
362        stat = outputFs.getFileStatus(path);
363      } catch (IOException e) {
364        LOG.warn("Unable to get the status for file=" + path);
365        return false;
366      }
367
368      try {
369        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
370          outputFs.setPermission(path, new FsPermission(filesMode));
371        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
372          outputFs.setPermission(path, refStat.getPermission());
373        }
374      } catch (IOException e) {
375        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
376        return false;
377      }
378
379      boolean hasRefStat = (refStat != null);
380      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
381      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
382      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
383        try {
384          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
385            outputFs.setOwner(path, user, group);
386          }
387        } catch (IOException e) {
388          LOG.warn(
389            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
390          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
391            + " group=" + group);
392          return false;
393        }
394      }
395
396      return true;
397    }
398
399    private boolean stringIsNotEmpty(final String str) {
400      return str != null && str.length() > 0;
401    }
402
403    private void copyData(final Context context, final Path inputPath, final InputStream in,
404      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
405      throws IOException {
406      final String statusMessage =
407        "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)";
408
409      try {
410        byte[] buffer = new byte[bufferSize];
411        long totalBytesWritten = 0;
412        int reportBytes = 0;
413        int bytesRead;
414
415        long stime = EnvironmentEdgeManager.currentTime();
416        while ((bytesRead = in.read(buffer)) > 0) {
417          out.write(buffer, 0, bytesRead);
418          totalBytesWritten += bytesRead;
419          reportBytes += bytesRead;
420
421          if (reportBytes >= REPORT_SIZE) {
422            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
423            context.setStatus(
424              String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
425                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
426                + " to " + outputPath);
427            reportBytes = 0;
428          }
429        }
430        long etime = EnvironmentEdgeManager.currentTime();
431
432        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
433        context
434          .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
435            (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
436            + outputPath);
437
438        // Verify that the written size match
439        if (totalBytesWritten != inputFileSize) {
440          String msg = "number of bytes copied not matching copied=" + totalBytesWritten
441            + " expected=" + inputFileSize + " for file=" + inputPath;
442          throw new IOException(msg);
443        }
444
445        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
446        LOG
447          .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten)
448            + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String
449              .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
450        context.getCounter(Counter.FILES_COPIED).increment(1);
451      } catch (IOException e) {
452        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
453        context.getCounter(Counter.COPY_FAILED).increment(1);
454        throw e;
455      }
456    }
457
458    /**
459     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
460     * fail or if the file is not found.
461     */
462    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
463      throws IOException {
464      try {
465        Configuration conf = context.getConfiguration();
466        FileLink link = null;
467        switch (fileInfo.getType()) {
468          case HFILE:
469            Path inputPath = new Path(fileInfo.getHfile());
470            link = getFileLink(inputPath, conf);
471            break;
472          case WAL:
473            String serverName = fileInfo.getWalServer();
474            String logName = fileInfo.getWalName();
475            link = new WALLink(inputRoot, serverName, logName);
476            break;
477          default:
478            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
479        }
480        return link.open(inputFs);
481      } catch (IOException e) {
482        context.getCounter(Counter.MISSING_FILES).increment(1);
483        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
484        throw e;
485      }
486    }
487
488    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
489      throws IOException {
490      try {
491        Configuration conf = context.getConfiguration();
492        FileLink link = null;
493        switch (fileInfo.getType()) {
494          case HFILE:
495            Path inputPath = new Path(fileInfo.getHfile());
496            link = getFileLink(inputPath, conf);
497            break;
498          case WAL:
499            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
500            break;
501          default:
502            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
503        }
504        return link.getFileStatus(inputFs);
505      } catch (FileNotFoundException e) {
506        context.getCounter(Counter.MISSING_FILES).increment(1);
507        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
508        throw e;
509      } catch (IOException e) {
510        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
511        throw e;
512      }
513    }
514
515    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
516      String regionName = HFileLink.getReferencedRegionName(path.getName());
517      TableName tableName = HFileLink.getReferencedTableName(path.getName());
518      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
519        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
520          HFileArchiveUtil.getArchivePath(conf), path);
521      }
522      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
523    }
524
525    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
526      try {
527        return fs.getFileChecksum(path);
528      } catch (IOException e) {
529        LOG.warn("Unable to get checksum for file=" + path, e);
530        return null;
531      }
532    }
533
534    /**
535     * Check if the two files are equal by looking at the file length, and at the checksum (if user
536     * has specified the verifyChecksum flag).
537     */
538    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
539      // Not matching length
540      if (inputStat.getLen() != outputStat.getLen()) return false;
541
542      // Mark files as equals, since user asked for no checksum verification
543      if (!verifyChecksum) return true;
544
545      // If checksums are not available, files are not the same.
546      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
547      if (inChecksum == null) return false;
548
549      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
550      if (outChecksum == null) return false;
551
552      return inChecksum.equals(outChecksum);
553    }
554  }
555
556  // ==========================================================================
557  // Input Format
558  // ==========================================================================
559
560  /**
561   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
562   * @return list of files referenced by the snapshot (pair of path and size)
563   */
564  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
565    final FileSystem fs, final Path snapshotDir) throws IOException {
566    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
567
568    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
569    final TableName table = TableName.valueOf(snapshotDesc.getTable());
570
571    // Get snapshot files
572    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
573    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
574      new SnapshotReferenceUtil.SnapshotVisitor() {
575        @Override
576        public void storeFile(final RegionInfo regionInfo, final String family,
577          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
578          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
579          if (!storeFile.hasReference()) {
580            String region = regionInfo.getEncodedName();
581            String hfile = storeFile.getName();
582            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
583              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
584          } else {
585            Pair<String, String> referredToRegionAndFile =
586              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
587            String referencedRegion = referredToRegionAndFile.getFirst();
588            String referencedHFile = referredToRegionAndFile.getSecond();
589            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
590              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
591          }
592          files.add(snapshotFileAndSize);
593        }
594      });
595
596    return files;
597  }
598
599  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
600    Configuration conf, TableName table, String region, String family, String hfile, long size)
601    throws IOException {
602    Path path = HFileLink.createPath(table, region, family, hfile);
603    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
604      .setHfile(path.toString()).build();
605    if (size == -1) {
606      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
607    }
608    return new Pair<>(fileInfo, size);
609  }
610
611  /**
612   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
613   * The groups created will have similar amounts of bytes.
614   * <p>
615   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
616   * group fetch the bigger file available, iterating through groups alternating the direction.
617   */
618  static List<List<Pair<SnapshotFileInfo, Long>>>
619    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
620    // Sort files by size, from small to big
621    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
622      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
623        long r = a.getSecond() - b.getSecond();
624        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
625      }
626    });
627
628    // create balanced groups
629    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
630    long[] sizeGroups = new long[ngroups];
631    int hi = files.size() - 1;
632    int lo = 0;
633
634    List<Pair<SnapshotFileInfo, Long>> group;
635    int dir = 1;
636    int g = 0;
637
638    while (hi >= lo) {
639      if (g == fileGroups.size()) {
640        group = new LinkedList<>();
641        fileGroups.add(group);
642      } else {
643        group = fileGroups.get(g);
644      }
645
646      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
647
648      // add the hi one
649      sizeGroups[g] += fileInfo.getSecond();
650      group.add(fileInfo);
651
652      // change direction when at the end or the beginning
653      g += dir;
654      if (g == ngroups) {
655        dir = -1;
656        g = ngroups - 1;
657      } else if (g < 0) {
658        dir = 1;
659        g = 0;
660      }
661    }
662
663    if (LOG.isDebugEnabled()) {
664      for (int i = 0; i < sizeGroups.length; ++i) {
665        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
666      }
667    }
668
669    return fileGroups;
670  }
671
672  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
673    @Override
674    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
675      TaskAttemptContext tac) throws IOException, InterruptedException {
676      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
677    }
678
679    @Override
680    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
681      Configuration conf = context.getConfiguration();
682      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
683      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
684
685      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
686      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
687      if (mappers == 0 && snapshotFiles.size() > 0) {
688        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
689        mappers = Math.min(mappers, snapshotFiles.size());
690        conf.setInt(CONF_NUM_SPLITS, mappers);
691        conf.setInt(MR_NUM_MAPS, mappers);
692      }
693
694      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
695      List<InputSplit> splits = new ArrayList(groups.size());
696      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
697        splits.add(new ExportSnapshotInputSplit(files));
698      }
699      return splits;
700    }
701
702    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
703      private List<Pair<BytesWritable, Long>> files;
704      private long length;
705
706      public ExportSnapshotInputSplit() {
707        this.files = null;
708      }
709
710      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
711        this.files = new ArrayList(snapshotFiles.size());
712        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
713          this.files.add(
714            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
715          this.length += fileInfo.getSecond();
716        }
717      }
718
719      private List<Pair<BytesWritable, Long>> getSplitKeys() {
720        return files;
721      }
722
723      @Override
724      public long getLength() throws IOException, InterruptedException {
725        return length;
726      }
727
728      @Override
729      public String[] getLocations() throws IOException, InterruptedException {
730        return new String[] {};
731      }
732
733      @Override
734      public void readFields(DataInput in) throws IOException {
735        int count = in.readInt();
736        files = new ArrayList<>(count);
737        length = 0;
738        for (int i = 0; i < count; ++i) {
739          BytesWritable fileInfo = new BytesWritable();
740          fileInfo.readFields(in);
741          long size = in.readLong();
742          files.add(new Pair<>(fileInfo, size));
743          length += size;
744        }
745      }
746
747      @Override
748      public void write(DataOutput out) throws IOException {
749        out.writeInt(files.size());
750        for (final Pair<BytesWritable, Long> fileInfo : files) {
751          fileInfo.getFirst().write(out);
752          out.writeLong(fileInfo.getSecond());
753        }
754      }
755    }
756
757    private static class ExportSnapshotRecordReader
758      extends RecordReader<BytesWritable, NullWritable> {
759      private final List<Pair<BytesWritable, Long>> files;
760      private long totalSize = 0;
761      private long procSize = 0;
762      private int index = -1;
763
764      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
765        this.files = files;
766        for (Pair<BytesWritable, Long> fileInfo : files) {
767          totalSize += fileInfo.getSecond();
768        }
769      }
770
771      @Override
772      public void close() {
773      }
774
775      @Override
776      public BytesWritable getCurrentKey() {
777        return files.get(index).getFirst();
778      }
779
780      @Override
781      public NullWritable getCurrentValue() {
782        return NullWritable.get();
783      }
784
785      @Override
786      public float getProgress() {
787        return (float) procSize / totalSize;
788      }
789
790      @Override
791      public void initialize(InputSplit split, TaskAttemptContext tac) {
792      }
793
794      @Override
795      public boolean nextKeyValue() {
796        if (index >= 0) {
797          procSize += files.get(index).getSecond();
798        }
799        return (++index < files.size());
800      }
801    }
802  }
803
804  // ==========================================================================
805  // Tool
806  // ==========================================================================
807
808  /**
809   * Run Map-Reduce Job to perform the files copy.
810   */
811  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
812    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
813    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
814    throws IOException, InterruptedException, ClassNotFoundException {
815    Configuration conf = getConf();
816    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
817    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
818    if (mappers > 0) {
819      conf.setInt(CONF_NUM_SPLITS, mappers);
820      conf.setInt(MR_NUM_MAPS, mappers);
821    }
822    conf.setInt(CONF_FILES_MODE, filesMode);
823    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
824    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
825    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
826    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
827    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
828    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
829
830    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
831    Job job = new Job(conf);
832    job.setJobName(jobname);
833    job.setJarByClass(ExportSnapshot.class);
834    TableMapReduceUtil.addDependencyJars(job);
835    job.setMapperClass(ExportMapper.class);
836    job.setInputFormatClass(ExportSnapshotInputFormat.class);
837    job.setOutputFormatClass(NullOutputFormat.class);
838    job.setMapSpeculativeExecution(false);
839    job.setNumReduceTasks(0);
840
841    // Acquire the delegation Tokens
842    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
843    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
844    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
845    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
846
847    // Run the MR Job
848    if (!job.waitForCompletion(true)) {
849      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
850    }
851  }
852
853  private void verifySnapshot(final Configuration baseConf, final FileSystem fs, final Path rootDir,
854    final Path snapshotDir) throws IOException {
855    // Update the conf with the current root dir, since may be a different cluster
856    Configuration conf = new Configuration(baseConf);
857    CommonFSUtils.setRootDir(conf, rootDir);
858    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
859    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
860    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
861  }
862
863  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
864    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
865    ExecutorService pool = Executors
866      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
867    List<Future<Void>> futures = new ArrayList<>();
868    for (Path dstPath : traversedPath) {
869      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
870      futures.add(future);
871    }
872    try {
873      for (Future<Void> future : futures) {
874        future.get();
875      }
876    } catch (InterruptedException | ExecutionException e) {
877      throw new IOException(e);
878    } finally {
879      pool.shutdownNow();
880    }
881  }
882
883  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
884    Configuration conf, List<Path> traversedPath) throws IOException {
885    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
886      try {
887        fs.setOwner(path, filesUser, filesGroup);
888      } catch (IOException e) {
889        throw new RuntimeException(
890          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
891      }
892    }, conf);
893  }
894
895  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
896    final List<Path> traversedPath, final Configuration conf) throws IOException {
897    if (filesMode <= 0) {
898      return;
899    }
900    FsPermission perm = new FsPermission(filesMode);
901    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
902      try {
903        fs.setPermission(path, perm);
904      } catch (IOException e) {
905        throw new RuntimeException(
906          "set permission for file " + path + " to " + filesMode + " failed", e);
907      }
908    }, conf);
909  }
910
911  private boolean verifyTarget = true;
912  private boolean verifySource = true;
913  private boolean verifyChecksum = true;
914  private String snapshotName = null;
915  private String targetName = null;
916  private boolean overwrite = false;
917  private String filesGroup = null;
918  private String filesUser = null;
919  private Path outputRoot = null;
920  private Path inputRoot = null;
921  private int bandwidthMB = Integer.MAX_VALUE;
922  private int filesMode = 0;
923  private int mappers = 0;
924  private boolean resetTtl = false;
925
926  @Override
927  protected void processOptions(CommandLine cmd) {
928    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
929    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
930    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
931      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
932    }
933    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
934      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
935    }
936    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
937    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
938    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
939    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
940    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
941    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
942    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
943    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
944    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
945    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
946    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
947  }
948
949  /**
950   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
951   * @return 0 on success, and != 0 upon failure.
952   */
953  @Override
954  public int doWork() throws IOException {
955    Configuration conf = getConf();
956
957    // Check user options
958    if (snapshotName == null) {
959      System.err.println("Snapshot name not provided.");
960      LOG.error("Use -h or --help for usage instructions.");
961      return 0;
962    }
963
964    if (outputRoot == null) {
965      System.err
966        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
967      LOG.error("Use -h or --help for usage instructions.");
968      return 0;
969    }
970
971    if (targetName == null) {
972      targetName = snapshotName;
973    }
974    if (inputRoot == null) {
975      inputRoot = CommonFSUtils.getRootDir(conf);
976    } else {
977      CommonFSUtils.setRootDir(conf, inputRoot);
978    }
979
980    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
981    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
982    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
983    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
984    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
985      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
986    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
987    Path snapshotTmpDir =
988      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
989    Path outputSnapshotDir =
990      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
991    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
992    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
993    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
994      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
995
996    // Verify snapshot source before copying files
997    if (verifySource) {
998      LOG.info("Verify snapshot source, inputFs={}, inputRoot={}, snapshotDir={}.",
999        inputFs.getUri(), inputRoot, snapshotDir);
1000      verifySnapshot(srcConf, inputFs, inputRoot, snapshotDir);
1001    }
1002
1003    // Find the necessary directory which need to change owner and group
1004    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1005    if (outputFs.exists(needSetOwnerDir)) {
1006      if (skipTmp) {
1007        needSetOwnerDir = outputSnapshotDir;
1008      } else {
1009        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1010        if (outputFs.exists(needSetOwnerDir)) {
1011          needSetOwnerDir = snapshotTmpDir;
1012        }
1013      }
1014    }
1015
1016    // Check if the snapshot already exists
1017    if (outputFs.exists(outputSnapshotDir)) {
1018      if (overwrite) {
1019        if (!outputFs.delete(outputSnapshotDir, true)) {
1020          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1021          return 1;
1022        }
1023      } else {
1024        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1025          + outputSnapshotDir);
1026        return 1;
1027      }
1028    }
1029
1030    if (!skipTmp) {
1031      // Check if the snapshot already in-progress
1032      if (outputFs.exists(snapshotTmpDir)) {
1033        if (overwrite) {
1034          if (!outputFs.delete(snapshotTmpDir, true)) {
1035            System.err
1036              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1037            return 1;
1038          }
1039        } else {
1040          System.err
1041            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1042          System.err
1043            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1044          System.err
1045            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1046          return 1;
1047        }
1048      }
1049    }
1050
1051    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1052    // The snapshot references must be copied before the hfiles otherwise the cleaner
1053    // will remove them because they are unreferenced.
1054    List<Path> travesedPaths = new ArrayList<>();
1055    boolean copySucceeded = false;
1056    try {
1057      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1058      travesedPaths =
1059        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1060          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1061      copySucceeded = true;
1062    } catch (IOException e) {
1063      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1064        + " to=" + initialOutputSnapshotDir, e);
1065    } finally {
1066      if (copySucceeded) {
1067        if (filesUser != null || filesGroup != null) {
1068          LOG.warn(
1069            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1070              + (filesGroup == null
1071                ? ""
1072                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1073          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1074        }
1075        if (filesMode > 0) {
1076          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1077          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1078        }
1079      }
1080    }
1081
1082    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1083    // reset TTL for target snapshot.
1084    if (!targetName.equals(snapshotName) || resetTtl) {
1085      SnapshotDescription.Builder snapshotDescBuilder =
1086        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1087      if (!targetName.equals(snapshotName)) {
1088        snapshotDescBuilder.setName(targetName);
1089      }
1090      if (resetTtl) {
1091        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1092      }
1093      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1094        initialOutputSnapshotDir, outputFs);
1095      if (filesUser != null || filesGroup != null) {
1096        outputFs.setOwner(
1097          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1098          filesGroup);
1099      }
1100      if (filesMode > 0) {
1101        outputFs.setPermission(
1102          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1103          new FsPermission((short) filesMode));
1104      }
1105    }
1106
1107    // Step 2 - Start MR Job to copy files
1108    // The snapshot references must be copied before the files otherwise the files gets removed
1109    // by the HFileArchiver, since they have no references.
1110    try {
1111      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1112        filesGroup, filesMode, mappers, bandwidthMB);
1113
1114      LOG.info("Finalize the Snapshot Export");
1115      if (!skipTmp) {
1116        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1117        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1118          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1119            + snapshotTmpDir + " to=" + outputSnapshotDir);
1120        }
1121      }
1122
1123      // Step 4 - Verify snapshot integrity
1124      if (verifyTarget) {
1125        LOG.info("Verify snapshot integrity");
1126        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1127      }
1128
1129      LOG.info("Export Completed: " + targetName);
1130      return 0;
1131    } catch (Exception e) {
1132      LOG.error("Snapshot export failed", e);
1133      if (!skipTmp) {
1134        outputFs.delete(snapshotTmpDir, true);
1135      }
1136      outputFs.delete(outputSnapshotDir, true);
1137      return 1;
1138    }
1139  }
1140
1141  @Override
1142  protected void printUsage() {
1143    super.printUsage();
1144    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1145      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1146      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1147      + "  hbase snapshot export \\\n"
1148      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1149      + "    --copy-to hdfs://srv1:50070/hbase");
1150  }
1151
1152  @Override
1153  protected void addOptions() {
1154    addRequiredOption(Options.SNAPSHOT);
1155    addOption(Options.COPY_TO);
1156    addOption(Options.COPY_FROM);
1157    addOption(Options.TARGET_NAME);
1158    addOption(Options.NO_CHECKSUM_VERIFY);
1159    addOption(Options.NO_TARGET_VERIFY);
1160    addOption(Options.NO_SOURCE_VERIFY);
1161    addOption(Options.OVERWRITE);
1162    addOption(Options.CHUSER);
1163    addOption(Options.CHGROUP);
1164    addOption(Options.CHMOD);
1165    addOption(Options.MAPPERS);
1166    addOption(Options.BANDWIDTH);
1167    addOption(Options.RESET_TTL);
1168  }
1169
1170  public static void main(String[] args) {
1171    new ExportSnapshot().doStaticMain(args);
1172  }
1173}