001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.function.BiConsumer; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataInputStream; 038import org.apache.hadoop.fs.FSDataOutputStream; 039import org.apache.hadoop.fs.FileChecksum; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.permission.FsPermission; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.io.FileLink; 049import org.apache.hadoop.hbase.io.HFileLink; 050import org.apache.hadoop.hbase.io.WALLink; 051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 053import org.apache.hadoop.hbase.mob.MobUtils; 054import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 055import org.apache.hadoop.hbase.util.AbstractHBaseTool; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.util.FSUtils; 059import org.apache.hadoop.hbase.util.HFileArchiveUtil; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.io.BytesWritable; 062import org.apache.hadoop.io.NullWritable; 063import org.apache.hadoop.io.Writable; 064import org.apache.hadoop.mapreduce.InputFormat; 065import org.apache.hadoop.mapreduce.InputSplit; 066import org.apache.hadoop.mapreduce.Job; 067import org.apache.hadoop.mapreduce.JobContext; 068import org.apache.hadoop.mapreduce.Mapper; 069import org.apache.hadoop.mapreduce.RecordReader; 070import org.apache.hadoop.mapreduce.TaskAttemptContext; 071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 072import org.apache.hadoop.mapreduce.security.TokenCache; 073import org.apache.hadoop.util.StringUtils; 074import org.apache.hadoop.util.Tool; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 081 082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 085 086/** 087 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 088 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 089 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 090 */ 091@InterfaceAudience.Public 092public class ExportSnapshot extends AbstractHBaseTool implements Tool { 093 public static final String NAME = "exportsnapshot"; 094 /** Configuration prefix for overrides for the source filesystem */ 095 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 096 /** Configuration prefix for overrides for the destination filesystem */ 097 public static final String CONF_DEST_PREFIX = NAME + ".to."; 098 099 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 100 101 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 102 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 103 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 104 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 105 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 106 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 107 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 108 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 109 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 110 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 111 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 112 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 113 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 114 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 115 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 116 private static final String CONF_COPY_MANIFEST_THREADS = 117 "snapshot.export.copy.references.threads"; 118 private static final int DEFAULT_COPY_MANIFEST_THREADS = 119 Runtime.getRuntime().availableProcessors(); 120 121 static class Testing { 122 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 123 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 124 int failuresCountToInject = 0; 125 int injectedFailureCount = 0; 126 } 127 128 // Command line options and defaults. 129 static final class Options { 130 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 131 static final Option TARGET_NAME = 132 new Option(null, "target", true, "Target name for the snapshot."); 133 static final Option COPY_TO = 134 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 135 static final Option COPY_FROM = 136 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 137 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 138 "Do not verify checksum, use name+length only."); 139 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 140 "Do not verify the integrity of the exported snapshot."); 141 static final Option NO_SOURCE_VERIFY = 142 new Option(null, "no-source-verify", false, "Do not verify the source of the snapshot."); 143 static final Option OVERWRITE = 144 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 145 static final Option CHUSER = 146 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 147 static final Option CHGROUP = 148 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 149 static final Option CHMOD = 150 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 151 static final Option MAPPERS = new Option(null, "mappers", true, 152 "Number of mappers to use during the copy (mapreduce.job.maps)."); 153 static final Option BANDWIDTH = 154 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 155 static final Option RESET_TTL = 156 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 157 } 158 159 // Export Map-Reduce Counters, to keep track of the progress 160 public enum Counter { 161 MISSING_FILES, 162 FILES_COPIED, 163 FILES_SKIPPED, 164 COPY_FAILED, 165 BYTES_EXPECTED, 166 BYTES_SKIPPED, 167 BYTES_COPIED 168 } 169 170 private static class ExportMapper 171 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 172 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 173 final static int REPORT_SIZE = 1 * 1024 * 1024; 174 final static int BUFFER_SIZE = 64 * 1024; 175 176 private boolean verifyChecksum; 177 private String filesGroup; 178 private String filesUser; 179 private short filesMode; 180 private int bufferSize; 181 182 private FileSystem outputFs; 183 private Path outputArchive; 184 private Path outputRoot; 185 186 private FileSystem inputFs; 187 private Path inputArchive; 188 private Path inputRoot; 189 190 private static Testing testing = new Testing(); 191 192 @Override 193 public void setup(Context context) throws IOException { 194 Configuration conf = context.getConfiguration(); 195 196 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 197 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 198 199 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 200 201 filesGroup = conf.get(CONF_FILES_GROUP); 202 filesUser = conf.get(CONF_FILES_USER); 203 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 204 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 205 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 206 207 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 208 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 209 210 try { 211 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 212 } catch (IOException e) { 213 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 214 } 215 216 try { 217 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 218 } catch (IOException e) { 219 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 220 } 221 222 // Use the default block size of the outputFs if bigger 223 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 224 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 225 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); 226 227 for (Counter c : Counter.values()) { 228 context.getCounter(c).increment(0); 229 } 230 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 231 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 232 // Get number of times we have already injected failure based on attempt number of this 233 // task. 234 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 235 } 236 } 237 238 @Override 239 public void map(BytesWritable key, NullWritable value, Context context) 240 throws InterruptedException, IOException { 241 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 242 Path outputPath = getOutputPath(inputInfo); 243 244 copyFile(context, inputInfo, outputPath); 245 } 246 247 /** 248 * Returns the location where the inputPath will be copied. 249 */ 250 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 251 Path path = null; 252 switch (inputInfo.getType()) { 253 case HFILE: 254 Path inputPath = new Path(inputInfo.getHfile()); 255 String family = inputPath.getParent().getName(); 256 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 257 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 258 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 259 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 260 new Path(region, new Path(family, hfile))); 261 break; 262 case WAL: 263 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 264 break; 265 default: 266 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 267 } 268 return new Path(outputArchive, path); 269 } 270 271 @SuppressWarnings("checkstyle:linelength") 272 /** 273 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 274 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 275 */ 276 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 277 throws IOException { 278 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 279 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 280 testing.injectedFailureCount++; 281 context.getCounter(Counter.COPY_FAILED).increment(1); 282 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 283 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 284 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 285 } 286 287 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 288 final Path outputPath) throws IOException { 289 // Get the file information 290 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 291 292 // Verify if the output file exists and is the same that we want to copy 293 if (outputFs.exists(outputPath)) { 294 FileStatus outputStat = outputFs.getFileStatus(outputPath); 295 if (outputStat != null && sameFile(inputStat, outputStat)) { 296 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 297 context.getCounter(Counter.FILES_SKIPPED).increment(1); 298 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 299 return; 300 } 301 } 302 303 InputStream in = openSourceFile(context, inputInfo); 304 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 305 if (Integer.MAX_VALUE != bandwidthMB) { 306 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 307 } 308 309 try { 310 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 311 312 // Ensure that the output folder is there and copy the file 313 createOutputPath(outputPath.getParent()); 314 FSDataOutputStream out = outputFs.create(outputPath, true); 315 try { 316 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); 317 } finally { 318 out.close(); 319 } 320 321 // Try to Preserve attributes 322 if (!preserveAttributes(outputPath, inputStat)) { 323 LOG.warn("You may have to run manually chown on: " + outputPath); 324 } 325 } finally { 326 in.close(); 327 injectTestFailure(context, inputInfo); 328 } 329 } 330 331 /** 332 * Create the output folder and optionally set ownership. 333 */ 334 private void createOutputPath(final Path path) throws IOException { 335 if (filesUser == null && filesGroup == null) { 336 outputFs.mkdirs(path); 337 } else { 338 Path parent = path.getParent(); 339 if (!outputFs.exists(parent) && !parent.isRoot()) { 340 createOutputPath(parent); 341 } 342 outputFs.mkdirs(path); 343 if (filesUser != null || filesGroup != null) { 344 // override the owner when non-null user/group is specified 345 outputFs.setOwner(path, filesUser, filesGroup); 346 } 347 if (filesMode > 0) { 348 outputFs.setPermission(path, new FsPermission(filesMode)); 349 } 350 } 351 } 352 353 /** 354 * Try to Preserve the files attribute selected by the user copying them from the source file 355 * This is only required when you are exporting as a different user than "hbase" or on a system 356 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 357 * can force a chmod with the user that knows is available on the system. 358 */ 359 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 360 FileStatus stat; 361 try { 362 stat = outputFs.getFileStatus(path); 363 } catch (IOException e) { 364 LOG.warn("Unable to get the status for file=" + path); 365 return false; 366 } 367 368 try { 369 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 370 outputFs.setPermission(path, new FsPermission(filesMode)); 371 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 372 outputFs.setPermission(path, refStat.getPermission()); 373 } 374 } catch (IOException e) { 375 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 376 return false; 377 } 378 379 boolean hasRefStat = (refStat != null); 380 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 381 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 382 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 383 try { 384 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 385 outputFs.setOwner(path, user, group); 386 } 387 } catch (IOException e) { 388 LOG.warn( 389 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 390 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 391 + " group=" + group); 392 return false; 393 } 394 } 395 396 return true; 397 } 398 399 private boolean stringIsNotEmpty(final String str) { 400 return str != null && str.length() > 0; 401 } 402 403 private void copyData(final Context context, final Path inputPath, final InputStream in, 404 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 405 throws IOException { 406 final String statusMessage = 407 "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)"; 408 409 try { 410 byte[] buffer = new byte[bufferSize]; 411 long totalBytesWritten = 0; 412 int reportBytes = 0; 413 int bytesRead; 414 415 long stime = EnvironmentEdgeManager.currentTime(); 416 while ((bytesRead = in.read(buffer)) > 0) { 417 out.write(buffer, 0, bytesRead); 418 totalBytesWritten += bytesRead; 419 reportBytes += bytesRead; 420 421 if (reportBytes >= REPORT_SIZE) { 422 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 423 context.setStatus( 424 String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 425 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 426 + " to " + outputPath); 427 reportBytes = 0; 428 } 429 } 430 long etime = EnvironmentEdgeManager.currentTime(); 431 432 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 433 context 434 .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 435 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 436 + outputPath); 437 438 // Verify that the written size match 439 if (totalBytesWritten != inputFileSize) { 440 String msg = "number of bytes copied not matching copied=" + totalBytesWritten 441 + " expected=" + inputFileSize + " for file=" + inputPath; 442 throw new IOException(msg); 443 } 444 445 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 446 LOG 447 .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten) 448 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String 449 .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 450 context.getCounter(Counter.FILES_COPIED).increment(1); 451 } catch (IOException e) { 452 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 453 context.getCounter(Counter.COPY_FAILED).increment(1); 454 throw e; 455 } 456 } 457 458 /** 459 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 460 * fail or if the file is not found. 461 */ 462 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 463 throws IOException { 464 try { 465 Configuration conf = context.getConfiguration(); 466 FileLink link = null; 467 switch (fileInfo.getType()) { 468 case HFILE: 469 Path inputPath = new Path(fileInfo.getHfile()); 470 link = getFileLink(inputPath, conf); 471 break; 472 case WAL: 473 String serverName = fileInfo.getWalServer(); 474 String logName = fileInfo.getWalName(); 475 link = new WALLink(inputRoot, serverName, logName); 476 break; 477 default: 478 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 479 } 480 return link.open(inputFs); 481 } catch (IOException e) { 482 context.getCounter(Counter.MISSING_FILES).increment(1); 483 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 484 throw e; 485 } 486 } 487 488 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 489 throws IOException { 490 try { 491 Configuration conf = context.getConfiguration(); 492 FileLink link = null; 493 switch (fileInfo.getType()) { 494 case HFILE: 495 Path inputPath = new Path(fileInfo.getHfile()); 496 link = getFileLink(inputPath, conf); 497 break; 498 case WAL: 499 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 500 break; 501 default: 502 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 503 } 504 return link.getFileStatus(inputFs); 505 } catch (FileNotFoundException e) { 506 context.getCounter(Counter.MISSING_FILES).increment(1); 507 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 508 throw e; 509 } catch (IOException e) { 510 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 511 throw e; 512 } 513 } 514 515 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 516 String regionName = HFileLink.getReferencedRegionName(path.getName()); 517 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 518 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 519 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 520 HFileArchiveUtil.getArchivePath(conf), path); 521 } 522 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 523 } 524 525 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 526 try { 527 return fs.getFileChecksum(path); 528 } catch (IOException e) { 529 LOG.warn("Unable to get checksum for file=" + path, e); 530 return null; 531 } 532 } 533 534 /** 535 * Check if the two files are equal by looking at the file length, and at the checksum (if user 536 * has specified the verifyChecksum flag). 537 */ 538 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 539 // Not matching length 540 if (inputStat.getLen() != outputStat.getLen()) return false; 541 542 // Mark files as equals, since user asked for no checksum verification 543 if (!verifyChecksum) return true; 544 545 // If checksums are not available, files are not the same. 546 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 547 if (inChecksum == null) return false; 548 549 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 550 if (outChecksum == null) return false; 551 552 return inChecksum.equals(outChecksum); 553 } 554 } 555 556 // ========================================================================== 557 // Input Format 558 // ========================================================================== 559 560 /** 561 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 562 * @return list of files referenced by the snapshot (pair of path and size) 563 */ 564 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 565 final FileSystem fs, final Path snapshotDir) throws IOException { 566 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 567 568 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 569 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 570 571 // Get snapshot files 572 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 573 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 574 new SnapshotReferenceUtil.SnapshotVisitor() { 575 @Override 576 public void storeFile(final RegionInfo regionInfo, final String family, 577 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 578 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 579 if (!storeFile.hasReference()) { 580 String region = regionInfo.getEncodedName(); 581 String hfile = storeFile.getName(); 582 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 583 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 584 } else { 585 Pair<String, String> referredToRegionAndFile = 586 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 587 String referencedRegion = referredToRegionAndFile.getFirst(); 588 String referencedHFile = referredToRegionAndFile.getSecond(); 589 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 590 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 591 } 592 files.add(snapshotFileAndSize); 593 } 594 }); 595 596 return files; 597 } 598 599 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 600 Configuration conf, TableName table, String region, String family, String hfile, long size) 601 throws IOException { 602 Path path = HFileLink.createPath(table, region, family, hfile); 603 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 604 .setHfile(path.toString()).build(); 605 if (size == -1) { 606 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 607 } 608 return new Pair<>(fileInfo, size); 609 } 610 611 /** 612 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 613 * The groups created will have similar amounts of bytes. 614 * <p> 615 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 616 * group fetch the bigger file available, iterating through groups alternating the direction. 617 */ 618 static List<List<Pair<SnapshotFileInfo, Long>>> 619 getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 620 // Sort files by size, from small to big 621 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 622 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 623 long r = a.getSecond() - b.getSecond(); 624 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 625 } 626 }); 627 628 // create balanced groups 629 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 630 long[] sizeGroups = new long[ngroups]; 631 int hi = files.size() - 1; 632 int lo = 0; 633 634 List<Pair<SnapshotFileInfo, Long>> group; 635 int dir = 1; 636 int g = 0; 637 638 while (hi >= lo) { 639 if (g == fileGroups.size()) { 640 group = new LinkedList<>(); 641 fileGroups.add(group); 642 } else { 643 group = fileGroups.get(g); 644 } 645 646 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 647 648 // add the hi one 649 sizeGroups[g] += fileInfo.getSecond(); 650 group.add(fileInfo); 651 652 // change direction when at the end or the beginning 653 g += dir; 654 if (g == ngroups) { 655 dir = -1; 656 g = ngroups - 1; 657 } else if (g < 0) { 658 dir = 1; 659 g = 0; 660 } 661 } 662 663 if (LOG.isDebugEnabled()) { 664 for (int i = 0; i < sizeGroups.length; ++i) { 665 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); 666 } 667 } 668 669 return fileGroups; 670 } 671 672 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 673 @Override 674 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 675 TaskAttemptContext tac) throws IOException, InterruptedException { 676 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 677 } 678 679 @Override 680 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 681 Configuration conf = context.getConfiguration(); 682 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 683 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 684 685 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 686 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 687 if (mappers == 0 && snapshotFiles.size() > 0) { 688 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 689 mappers = Math.min(mappers, snapshotFiles.size()); 690 conf.setInt(CONF_NUM_SPLITS, mappers); 691 conf.setInt(MR_NUM_MAPS, mappers); 692 } 693 694 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 695 List<InputSplit> splits = new ArrayList(groups.size()); 696 for (List<Pair<SnapshotFileInfo, Long>> files : groups) { 697 splits.add(new ExportSnapshotInputSplit(files)); 698 } 699 return splits; 700 } 701 702 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 703 private List<Pair<BytesWritable, Long>> files; 704 private long length; 705 706 public ExportSnapshotInputSplit() { 707 this.files = null; 708 } 709 710 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 711 this.files = new ArrayList(snapshotFiles.size()); 712 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 713 this.files.add( 714 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 715 this.length += fileInfo.getSecond(); 716 } 717 } 718 719 private List<Pair<BytesWritable, Long>> getSplitKeys() { 720 return files; 721 } 722 723 @Override 724 public long getLength() throws IOException, InterruptedException { 725 return length; 726 } 727 728 @Override 729 public String[] getLocations() throws IOException, InterruptedException { 730 return new String[] {}; 731 } 732 733 @Override 734 public void readFields(DataInput in) throws IOException { 735 int count = in.readInt(); 736 files = new ArrayList<>(count); 737 length = 0; 738 for (int i = 0; i < count; ++i) { 739 BytesWritable fileInfo = new BytesWritable(); 740 fileInfo.readFields(in); 741 long size = in.readLong(); 742 files.add(new Pair<>(fileInfo, size)); 743 length += size; 744 } 745 } 746 747 @Override 748 public void write(DataOutput out) throws IOException { 749 out.writeInt(files.size()); 750 for (final Pair<BytesWritable, Long> fileInfo : files) { 751 fileInfo.getFirst().write(out); 752 out.writeLong(fileInfo.getSecond()); 753 } 754 } 755 } 756 757 private static class ExportSnapshotRecordReader 758 extends RecordReader<BytesWritable, NullWritable> { 759 private final List<Pair<BytesWritable, Long>> files; 760 private long totalSize = 0; 761 private long procSize = 0; 762 private int index = -1; 763 764 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 765 this.files = files; 766 for (Pair<BytesWritable, Long> fileInfo : files) { 767 totalSize += fileInfo.getSecond(); 768 } 769 } 770 771 @Override 772 public void close() { 773 } 774 775 @Override 776 public BytesWritable getCurrentKey() { 777 return files.get(index).getFirst(); 778 } 779 780 @Override 781 public NullWritable getCurrentValue() { 782 return NullWritable.get(); 783 } 784 785 @Override 786 public float getProgress() { 787 return (float) procSize / totalSize; 788 } 789 790 @Override 791 public void initialize(InputSplit split, TaskAttemptContext tac) { 792 } 793 794 @Override 795 public boolean nextKeyValue() { 796 if (index >= 0) { 797 procSize += files.get(index).getSecond(); 798 } 799 return (++index < files.size()); 800 } 801 } 802 } 803 804 // ========================================================================== 805 // Tool 806 // ========================================================================== 807 808 /** 809 * Run Map-Reduce Job to perform the files copy. 810 */ 811 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 812 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 813 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) 814 throws IOException, InterruptedException, ClassNotFoundException { 815 Configuration conf = getConf(); 816 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 817 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 818 if (mappers > 0) { 819 conf.setInt(CONF_NUM_SPLITS, mappers); 820 conf.setInt(MR_NUM_MAPS, mappers); 821 } 822 conf.setInt(CONF_FILES_MODE, filesMode); 823 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 824 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 825 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 826 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 827 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 828 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 829 830 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 831 Job job = new Job(conf); 832 job.setJobName(jobname); 833 job.setJarByClass(ExportSnapshot.class); 834 TableMapReduceUtil.addDependencyJars(job); 835 job.setMapperClass(ExportMapper.class); 836 job.setInputFormatClass(ExportSnapshotInputFormat.class); 837 job.setOutputFormatClass(NullOutputFormat.class); 838 job.setMapSpeculativeExecution(false); 839 job.setNumReduceTasks(0); 840 841 // Acquire the delegation Tokens 842 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 843 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 844 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 845 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 846 847 // Run the MR Job 848 if (!job.waitForCompletion(true)) { 849 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 850 } 851 } 852 853 private void verifySnapshot(final Configuration baseConf, final FileSystem fs, final Path rootDir, 854 final Path snapshotDir) throws IOException { 855 // Update the conf with the current root dir, since may be a different cluster 856 Configuration conf = new Configuration(baseConf); 857 CommonFSUtils.setRootDir(conf, rootDir); 858 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 859 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 860 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 861 } 862 863 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 864 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 865 ExecutorService pool = Executors 866 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 867 List<Future<Void>> futures = new ArrayList<>(); 868 for (Path dstPath : traversedPath) { 869 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 870 futures.add(future); 871 } 872 try { 873 for (Future<Void> future : futures) { 874 future.get(); 875 } 876 } catch (InterruptedException | ExecutionException e) { 877 throw new IOException(e); 878 } finally { 879 pool.shutdownNow(); 880 } 881 } 882 883 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 884 Configuration conf, List<Path> traversedPath) throws IOException { 885 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 886 try { 887 fs.setOwner(path, filesUser, filesGroup); 888 } catch (IOException e) { 889 throw new RuntimeException( 890 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 891 } 892 }, conf); 893 } 894 895 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 896 final List<Path> traversedPath, final Configuration conf) throws IOException { 897 if (filesMode <= 0) { 898 return; 899 } 900 FsPermission perm = new FsPermission(filesMode); 901 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 902 try { 903 fs.setPermission(path, perm); 904 } catch (IOException e) { 905 throw new RuntimeException( 906 "set permission for file " + path + " to " + filesMode + " failed", e); 907 } 908 }, conf); 909 } 910 911 private boolean verifyTarget = true; 912 private boolean verifySource = true; 913 private boolean verifyChecksum = true; 914 private String snapshotName = null; 915 private String targetName = null; 916 private boolean overwrite = false; 917 private String filesGroup = null; 918 private String filesUser = null; 919 private Path outputRoot = null; 920 private Path inputRoot = null; 921 private int bandwidthMB = Integer.MAX_VALUE; 922 private int filesMode = 0; 923 private int mappers = 0; 924 private boolean resetTtl = false; 925 926 @Override 927 protected void processOptions(CommandLine cmd) { 928 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 929 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 930 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 931 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 932 } 933 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 934 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 935 } 936 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 937 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 938 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 939 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 940 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 941 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 942 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 943 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 944 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 945 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 946 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 947 } 948 949 /** 950 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 951 * @return 0 on success, and != 0 upon failure. 952 */ 953 @Override 954 public int doWork() throws IOException { 955 Configuration conf = getConf(); 956 957 // Check user options 958 if (snapshotName == null) { 959 System.err.println("Snapshot name not provided."); 960 LOG.error("Use -h or --help for usage instructions."); 961 return 0; 962 } 963 964 if (outputRoot == null) { 965 System.err 966 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 967 LOG.error("Use -h or --help for usage instructions."); 968 return 0; 969 } 970 971 if (targetName == null) { 972 targetName = snapshotName; 973 } 974 if (inputRoot == null) { 975 inputRoot = CommonFSUtils.getRootDir(conf); 976 } else { 977 CommonFSUtils.setRootDir(conf, inputRoot); 978 } 979 980 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 981 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 982 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 983 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 984 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 985 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 986 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 987 Path snapshotTmpDir = 988 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 989 Path outputSnapshotDir = 990 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 991 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 992 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 993 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 994 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 995 996 // Verify snapshot source before copying files 997 if (verifySource) { 998 LOG.info("Verify snapshot source, inputFs={}, inputRoot={}, snapshotDir={}.", 999 inputFs.getUri(), inputRoot, snapshotDir); 1000 verifySnapshot(srcConf, inputFs, inputRoot, snapshotDir); 1001 } 1002 1003 // Find the necessary directory which need to change owner and group 1004 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1005 if (outputFs.exists(needSetOwnerDir)) { 1006 if (skipTmp) { 1007 needSetOwnerDir = outputSnapshotDir; 1008 } else { 1009 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1010 if (outputFs.exists(needSetOwnerDir)) { 1011 needSetOwnerDir = snapshotTmpDir; 1012 } 1013 } 1014 } 1015 1016 // Check if the snapshot already exists 1017 if (outputFs.exists(outputSnapshotDir)) { 1018 if (overwrite) { 1019 if (!outputFs.delete(outputSnapshotDir, true)) { 1020 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1021 return 1; 1022 } 1023 } else { 1024 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1025 + outputSnapshotDir); 1026 return 1; 1027 } 1028 } 1029 1030 if (!skipTmp) { 1031 // Check if the snapshot already in-progress 1032 if (outputFs.exists(snapshotTmpDir)) { 1033 if (overwrite) { 1034 if (!outputFs.delete(snapshotTmpDir, true)) { 1035 System.err 1036 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1037 return 1; 1038 } 1039 } else { 1040 System.err 1041 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1042 System.err 1043 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1044 System.err 1045 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1046 return 1; 1047 } 1048 } 1049 } 1050 1051 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1052 // The snapshot references must be copied before the hfiles otherwise the cleaner 1053 // will remove them because they are unreferenced. 1054 List<Path> travesedPaths = new ArrayList<>(); 1055 boolean copySucceeded = false; 1056 try { 1057 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1058 travesedPaths = 1059 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1060 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1061 copySucceeded = true; 1062 } catch (IOException e) { 1063 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1064 + " to=" + initialOutputSnapshotDir, e); 1065 } finally { 1066 if (copySucceeded) { 1067 if (filesUser != null || filesGroup != null) { 1068 LOG.warn( 1069 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1070 + (filesGroup == null 1071 ? "" 1072 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1073 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1074 } 1075 if (filesMode > 0) { 1076 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1077 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1078 } 1079 } 1080 } 1081 1082 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1083 // reset TTL for target snapshot. 1084 if (!targetName.equals(snapshotName) || resetTtl) { 1085 SnapshotDescription.Builder snapshotDescBuilder = 1086 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1087 if (!targetName.equals(snapshotName)) { 1088 snapshotDescBuilder.setName(targetName); 1089 } 1090 if (resetTtl) { 1091 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1092 } 1093 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1094 initialOutputSnapshotDir, outputFs); 1095 if (filesUser != null || filesGroup != null) { 1096 outputFs.setOwner( 1097 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1098 filesGroup); 1099 } 1100 if (filesMode > 0) { 1101 outputFs.setPermission( 1102 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1103 new FsPermission((short) filesMode)); 1104 } 1105 } 1106 1107 // Step 2 - Start MR Job to copy files 1108 // The snapshot references must be copied before the files otherwise the files gets removed 1109 // by the HFileArchiver, since they have no references. 1110 try { 1111 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1112 filesGroup, filesMode, mappers, bandwidthMB); 1113 1114 LOG.info("Finalize the Snapshot Export"); 1115 if (!skipTmp) { 1116 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1117 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1118 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1119 + snapshotTmpDir + " to=" + outputSnapshotDir); 1120 } 1121 } 1122 1123 // Step 4 - Verify snapshot integrity 1124 if (verifyTarget) { 1125 LOG.info("Verify snapshot integrity"); 1126 verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir); 1127 } 1128 1129 LOG.info("Export Completed: " + targetName); 1130 return 0; 1131 } catch (Exception e) { 1132 LOG.error("Snapshot export failed", e); 1133 if (!skipTmp) { 1134 outputFs.delete(snapshotTmpDir, true); 1135 } 1136 outputFs.delete(outputSnapshotDir, true); 1137 return 1; 1138 } 1139 } 1140 1141 @Override 1142 protected void printUsage() { 1143 super.printUsage(); 1144 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1145 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1146 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1147 + " hbase snapshot export \\\n" 1148 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1149 + " --copy-to hdfs://srv1:50070/hbase"); 1150 } 1151 1152 @Override 1153 protected void addOptions() { 1154 addRequiredOption(Options.SNAPSHOT); 1155 addOption(Options.COPY_TO); 1156 addOption(Options.COPY_FROM); 1157 addOption(Options.TARGET_NAME); 1158 addOption(Options.NO_CHECKSUM_VERIFY); 1159 addOption(Options.NO_TARGET_VERIFY); 1160 addOption(Options.NO_SOURCE_VERIFY); 1161 addOption(Options.OVERWRITE); 1162 addOption(Options.CHUSER); 1163 addOption(Options.CHGROUP); 1164 addOption(Options.CHMOD); 1165 addOption(Options.MAPPERS); 1166 addOption(Options.BANDWIDTH); 1167 addOption(Options.RESET_TTL); 1168 } 1169 1170 public static void main(String[] args) { 1171 new ExportSnapshot().doStaticMain(args); 1172 } 1173}