001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.HashSet; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Set; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import java.util.function.BiConsumer; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.FileChecksum; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.fs.permission.FsPermission; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.io.FileLink; 051import org.apache.hadoop.hbase.io.HFileLink; 052import org.apache.hadoop.hbase.io.WALLink; 053import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 054import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 055import org.apache.hadoop.hbase.mob.MobUtils; 056import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 057import org.apache.hadoop.hbase.util.AbstractHBaseTool; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.FSUtils; 061import org.apache.hadoop.hbase.util.HFileArchiveUtil; 062import org.apache.hadoop.hbase.util.Pair; 063import org.apache.hadoop.io.BytesWritable; 064import org.apache.hadoop.io.NullWritable; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.mapreduce.InputFormat; 067import org.apache.hadoop.mapreduce.InputSplit; 068import org.apache.hadoop.mapreduce.Job; 069import org.apache.hadoop.mapreduce.JobContext; 070import org.apache.hadoop.mapreduce.Mapper; 071import org.apache.hadoop.mapreduce.RecordReader; 072import org.apache.hadoop.mapreduce.TaskAttemptContext; 073import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 074import org.apache.hadoop.mapreduce.security.TokenCache; 075import org.apache.hadoop.util.StringUtils; 076import org.apache.hadoop.util.Tool; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 082import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 083 084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 088 089/** 090 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 091 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 092 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 093 */ 094@InterfaceAudience.Public 095public class ExportSnapshot extends AbstractHBaseTool implements Tool { 096 public static final String NAME = "exportsnapshot"; 097 /** Configuration prefix for overrides for the source filesystem */ 098 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 099 /** Configuration prefix for overrides for the destination filesystem */ 100 public static final String CONF_DEST_PREFIX = NAME + ".to."; 101 102 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 103 104 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 105 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 106 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 107 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 108 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 109 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 110 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 111 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 112 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 113 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 114 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 115 private static final String CONF_REPORT_SIZE = "snapshot.export.report.size"; 116 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 117 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 118 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 119 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 120 private static final String CONF_COPY_MANIFEST_THREADS = 121 "snapshot.export.copy.references.threads"; 122 private static final int DEFAULT_COPY_MANIFEST_THREADS = 123 Runtime.getRuntime().availableProcessors(); 124 125 static class Testing { 126 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 127 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 128 int failuresCountToInject = 0; 129 int injectedFailureCount = 0; 130 } 131 132 // Command line options and defaults. 133 static final class Options { 134 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 135 static final Option TARGET_NAME = 136 new Option(null, "target", true, "Target name for the snapshot."); 137 static final Option COPY_TO = 138 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 139 static final Option COPY_FROM = 140 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 141 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 142 "Do not verify checksum, use name+length only."); 143 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 144 "Do not verify the exported snapshot's expiration status and integrity."); 145 static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false, 146 "Do not verify the source snapshot's expiration status and integrity."); 147 static final Option OVERWRITE = 148 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 149 static final Option CHUSER = 150 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 151 static final Option CHGROUP = 152 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 153 static final Option CHMOD = 154 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 155 static final Option MAPPERS = new Option(null, "mappers", true, 156 "Number of mappers to use during the copy (mapreduce.job.maps)."); 157 static final Option BANDWIDTH = 158 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 159 static final Option RESET_TTL = 160 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 161 } 162 163 // Export Map-Reduce Counters, to keep track of the progress 164 public enum Counter { 165 MISSING_FILES, 166 FILES_COPIED, 167 FILES_SKIPPED, 168 COPY_FAILED, 169 BYTES_EXPECTED, 170 BYTES_SKIPPED, 171 BYTES_COPIED 172 } 173 174 /** 175 * Indicates the checksum comparison result. 176 */ 177 public enum ChecksumComparison { 178 TRUE, // checksum comparison is compatible and true. 179 FALSE, // checksum comparison is compatible and false. 180 INCOMPATIBLE, // checksum comparison is not compatible. 181 } 182 183 private static class ExportMapper 184 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 185 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 186 final static int REPORT_SIZE = 1 * 1024 * 1024; 187 final static int BUFFER_SIZE = 64 * 1024; 188 189 private boolean verifyChecksum; 190 private String filesGroup; 191 private String filesUser; 192 private short filesMode; 193 private int bufferSize; 194 private int reportSize; 195 196 private FileSystem outputFs; 197 private Path outputArchive; 198 private Path outputRoot; 199 200 private FileSystem inputFs; 201 private Path inputArchive; 202 private Path inputRoot; 203 204 private static Testing testing = new Testing(); 205 206 @Override 207 public void setup(Context context) throws IOException { 208 Configuration conf = context.getConfiguration(); 209 210 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 211 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 212 213 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 214 215 filesGroup = conf.get(CONF_FILES_GROUP); 216 filesUser = conf.get(CONF_FILES_USER); 217 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 218 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 219 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 220 221 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 222 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 223 224 try { 225 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 226 } catch (IOException e) { 227 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 228 } 229 230 try { 231 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 232 } catch (IOException e) { 233 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 234 } 235 236 // Use the default block size of the outputFs if bigger 237 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 238 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 239 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); 240 reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE); 241 242 for (Counter c : Counter.values()) { 243 context.getCounter(c).increment(0); 244 } 245 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 246 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 247 // Get number of times we have already injected failure based on attempt number of this 248 // task. 249 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 250 } 251 } 252 253 @Override 254 public void map(BytesWritable key, NullWritable value, Context context) 255 throws InterruptedException, IOException { 256 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 257 Path outputPath = getOutputPath(inputInfo); 258 259 copyFile(context, inputInfo, outputPath); 260 } 261 262 /** 263 * Returns the location where the inputPath will be copied. 264 */ 265 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 266 Path path = null; 267 switch (inputInfo.getType()) { 268 case HFILE: 269 Path inputPath = new Path(inputInfo.getHfile()); 270 String family = inputPath.getParent().getName(); 271 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 272 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 273 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 274 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 275 new Path(region, new Path(family, hfile))); 276 break; 277 case WAL: 278 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 279 break; 280 default: 281 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 282 } 283 return new Path(outputArchive, path); 284 } 285 286 @SuppressWarnings("checkstyle:linelength") 287 /** 288 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 289 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 290 */ 291 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 292 throws IOException { 293 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 294 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 295 testing.injectedFailureCount++; 296 context.getCounter(Counter.COPY_FAILED).increment(1); 297 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 298 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 299 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 300 } 301 302 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 303 final Path outputPath) throws IOException { 304 // Get the file information 305 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 306 307 // Verify if the output file exists and is the same that we want to copy 308 if (outputFs.exists(outputPath)) { 309 FileStatus outputStat = outputFs.getFileStatus(outputPath); 310 if (outputStat != null && sameFile(inputStat, outputStat)) { 311 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 312 context.getCounter(Counter.FILES_SKIPPED).increment(1); 313 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 314 return; 315 } 316 } 317 318 InputStream in = openSourceFile(context, inputInfo); 319 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 320 if (Integer.MAX_VALUE != bandwidthMB) { 321 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 322 } 323 324 Path inputPath = inputStat.getPath(); 325 try { 326 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 327 328 // Ensure that the output folder is there and copy the file 329 createOutputPath(outputPath.getParent()); 330 FSDataOutputStream out = outputFs.create(outputPath, true); 331 332 long stime = EnvironmentEdgeManager.currentTime(); 333 long totalBytesWritten = 334 copyData(context, inputPath, in, outputPath, out, inputStat.getLen()); 335 336 // Verify the file length and checksum 337 verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath)); 338 339 long etime = EnvironmentEdgeManager.currentTime(); 340 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 341 LOG 342 .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten) 343 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String 344 .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 345 context.getCounter(Counter.FILES_COPIED).increment(1); 346 347 // Try to Preserve attributes 348 if (!preserveAttributes(outputPath, inputStat)) { 349 LOG.warn("You may have to run manually chown on: " + outputPath); 350 } 351 } catch (IOException e) { 352 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 353 context.getCounter(Counter.COPY_FAILED).increment(1); 354 throw e; 355 } finally { 356 injectTestFailure(context, inputInfo); 357 } 358 } 359 360 /** 361 * Create the output folder and optionally set ownership. 362 */ 363 private void createOutputPath(final Path path) throws IOException { 364 if (filesUser == null && filesGroup == null) { 365 outputFs.mkdirs(path); 366 } else { 367 Path parent = path.getParent(); 368 if (!outputFs.exists(parent) && !parent.isRoot()) { 369 createOutputPath(parent); 370 } 371 outputFs.mkdirs(path); 372 if (filesUser != null || filesGroup != null) { 373 // override the owner when non-null user/group is specified 374 outputFs.setOwner(path, filesUser, filesGroup); 375 } 376 if (filesMode > 0) { 377 outputFs.setPermission(path, new FsPermission(filesMode)); 378 } 379 } 380 } 381 382 /** 383 * Try to Preserve the files attribute selected by the user copying them from the source file 384 * This is only required when you are exporting as a different user than "hbase" or on a system 385 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 386 * can force a chmod with the user that knows is available on the system. 387 */ 388 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 389 FileStatus stat; 390 try { 391 stat = outputFs.getFileStatus(path); 392 } catch (IOException e) { 393 LOG.warn("Unable to get the status for file=" + path); 394 return false; 395 } 396 397 try { 398 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 399 outputFs.setPermission(path, new FsPermission(filesMode)); 400 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 401 outputFs.setPermission(path, refStat.getPermission()); 402 } 403 } catch (IOException e) { 404 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 405 return false; 406 } 407 408 boolean hasRefStat = (refStat != null); 409 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 410 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 411 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 412 try { 413 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 414 outputFs.setOwner(path, user, group); 415 } 416 } catch (IOException e) { 417 LOG.warn( 418 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 419 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 420 + " group=" + group); 421 return false; 422 } 423 } 424 425 return true; 426 } 427 428 private boolean stringIsNotEmpty(final String str) { 429 return str != null && str.length() > 0; 430 } 431 432 private long copyData(final Context context, final Path inputPath, final InputStream in, 433 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 434 throws IOException { 435 final String statusMessage = 436 "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)"; 437 438 try { 439 byte[] buffer = new byte[bufferSize]; 440 long totalBytesWritten = 0; 441 int reportBytes = 0; 442 int bytesRead; 443 444 while ((bytesRead = in.read(buffer)) > 0) { 445 out.write(buffer, 0, bytesRead); 446 totalBytesWritten += bytesRead; 447 reportBytes += bytesRead; 448 449 if (reportBytes >= reportSize) { 450 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 451 context.setStatus( 452 String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 453 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 454 + " to " + outputPath); 455 reportBytes = 0; 456 } 457 } 458 459 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 460 context 461 .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 462 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 463 + outputPath); 464 465 return totalBytesWritten; 466 } finally { 467 out.close(); 468 in.close(); 469 } 470 } 471 472 /** 473 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 474 * fail or if the file is not found. 475 */ 476 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 477 throws IOException { 478 try { 479 Configuration conf = context.getConfiguration(); 480 FileLink link = null; 481 switch (fileInfo.getType()) { 482 case HFILE: 483 Path inputPath = new Path(fileInfo.getHfile()); 484 link = getFileLink(inputPath, conf); 485 break; 486 case WAL: 487 String serverName = fileInfo.getWalServer(); 488 String logName = fileInfo.getWalName(); 489 link = new WALLink(inputRoot, serverName, logName); 490 break; 491 default: 492 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 493 } 494 return link.open(inputFs); 495 } catch (IOException e) { 496 context.getCounter(Counter.MISSING_FILES).increment(1); 497 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 498 throw e; 499 } 500 } 501 502 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 503 throws IOException { 504 try { 505 Configuration conf = context.getConfiguration(); 506 FileLink link = null; 507 switch (fileInfo.getType()) { 508 case HFILE: 509 Path inputPath = new Path(fileInfo.getHfile()); 510 link = getFileLink(inputPath, conf); 511 break; 512 case WAL: 513 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 514 break; 515 default: 516 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 517 } 518 return link.getFileStatus(inputFs); 519 } catch (FileNotFoundException e) { 520 context.getCounter(Counter.MISSING_FILES).increment(1); 521 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 522 throw e; 523 } catch (IOException e) { 524 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 525 throw e; 526 } 527 } 528 529 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 530 String regionName = HFileLink.getReferencedRegionName(path.getName()); 531 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 532 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 533 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 534 HFileArchiveUtil.getArchivePath(conf), path); 535 } 536 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 537 } 538 539 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 540 try { 541 return fs.getFileChecksum(path); 542 } catch (IOException e) { 543 LOG.warn("Unable to get checksum for file=" + path, e); 544 return null; 545 } 546 } 547 548 /** 549 * Utility to compare the file length and checksums for the paths specified. 550 */ 551 private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat) 552 throws IOException { 553 long inputLen = inputStat.getLen(); 554 long outputLen = outputStat.getLen(); 555 Path inputPath = inputStat.getPath(); 556 Path outputPath = outputStat.getPath(); 557 558 if (inputLen != outputLen) { 559 throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen 560 + ") and output:" + outputPath + " (" + outputLen + ")"); 561 } 562 563 // If length==0, we will skip checksum 564 if (inputLen != 0 && verifyChecksum) { 565 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 566 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 567 568 ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum); 569 if (!checksumComparison.equals(ChecksumComparison.TRUE)) { 570 StringBuilder errMessage = new StringBuilder("Checksum mismatch between ") 571 .append(inputPath).append(" and ").append(outputPath).append("."); 572 573 boolean addSkipHint = false; 574 String inputScheme = inputFs.getScheme(); 575 String outputScheme = outputFs.getScheme(); 576 if (!inputScheme.equals(outputScheme)) { 577 errMessage.append(" Input and output filesystems are of different types.\n") 578 .append("Their checksum algorithms may be incompatible."); 579 addSkipHint = true; 580 } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) { 581 errMessage.append(" Input and output differ in block-size."); 582 addSkipHint = true; 583 } else if ( 584 inChecksum != null && outChecksum != null 585 && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 586 ) { 587 errMessage.append(" Input and output checksum algorithms are of different types."); 588 addSkipHint = true; 589 } 590 if (addSkipHint) { 591 errMessage 592 .append(" You can choose file-level checksum validation via " 593 + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes" 594 + " or filesystems are different.\n") 595 .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,") 596 .append( 597 " for the table backup scenario, you should use -i option to skip checksum-checks.\n") 598 .append(" (NOTE: By skipping checksums, one runs the risk of " 599 + "masking data-corruption during file-transfer.)\n"); 600 } 601 throw new IOException(errMessage.toString()); 602 } 603 } 604 } 605 606 /** 607 * Utility to compare checksums 608 */ 609 private ChecksumComparison verifyChecksum(final FileChecksum inChecksum, 610 final FileChecksum outChecksum) { 611 // If the input or output checksum is null, or the algorithms of input and output are not 612 // equal, that means there is no comparison 613 // and return not compatible. else if matched, return compatible with the matched result. 614 if ( 615 inChecksum == null || outChecksum == null 616 || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 617 ) { 618 return ChecksumComparison.INCOMPATIBLE; 619 } else if (inChecksum.equals(outChecksum)) { 620 return ChecksumComparison.TRUE; 621 } 622 return ChecksumComparison.FALSE; 623 } 624 625 /** 626 * Check if the two files are equal by looking at the file length, and at the checksum (if user 627 * has specified the verifyChecksum flag). 628 */ 629 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 630 // Not matching length 631 if (inputStat.getLen() != outputStat.getLen()) return false; 632 633 // Mark files as equals, since user asked for no checksum verification 634 if (!verifyChecksum) return true; 635 636 // If checksums are not available, files are not the same. 637 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 638 if (inChecksum == null) return false; 639 640 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 641 if (outChecksum == null) return false; 642 643 return inChecksum.equals(outChecksum); 644 } 645 } 646 647 // ========================================================================== 648 // Input Format 649 // ========================================================================== 650 651 /** 652 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 653 * @return list of files referenced by the snapshot (pair of path and size) 654 */ 655 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 656 final FileSystem fs, final Path snapshotDir) throws IOException { 657 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 658 659 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 660 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 661 662 // Get snapshot files 663 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 664 Set<String> addedFiles = new HashSet<>(); 665 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 666 new SnapshotReferenceUtil.SnapshotVisitor() { 667 @Override 668 public void storeFile(final RegionInfo regionInfo, final String family, 669 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 670 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 671 if (!storeFile.hasReference()) { 672 String region = regionInfo.getEncodedName(); 673 String hfile = storeFile.getName(); 674 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 675 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 676 } else { 677 Pair<String, String> referredToRegionAndFile = 678 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 679 String referencedRegion = referredToRegionAndFile.getFirst(); 680 String referencedHFile = referredToRegionAndFile.getSecond(); 681 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 682 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 683 } 684 String fileToExport = snapshotFileAndSize.getFirst().getHfile(); 685 if (!addedFiles.contains(fileToExport)) { 686 files.add(snapshotFileAndSize); 687 addedFiles.add(fileToExport); 688 } else { 689 LOG.debug("Skip the existing file: {}.", fileToExport); 690 } 691 } 692 }); 693 694 return files; 695 } 696 697 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 698 Configuration conf, TableName table, String region, String family, String hfile, long size) 699 throws IOException { 700 Path path = HFileLink.createPath(table, region, family, hfile); 701 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 702 .setHfile(path.toString()).build(); 703 if (size == -1) { 704 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 705 } 706 return new Pair<>(fileInfo, size); 707 } 708 709 /** 710 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 711 * The groups created will have similar amounts of bytes. 712 * <p> 713 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 714 * group fetch the bigger file available, iterating through groups alternating the direction. 715 */ 716 static List<List<Pair<SnapshotFileInfo, Long>>> 717 getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 718 // Sort files by size, from small to big 719 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 720 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 721 long r = a.getSecond() - b.getSecond(); 722 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 723 } 724 }); 725 726 // create balanced groups 727 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 728 long[] sizeGroups = new long[ngroups]; 729 int hi = files.size() - 1; 730 int lo = 0; 731 732 List<Pair<SnapshotFileInfo, Long>> group; 733 int dir = 1; 734 int g = 0; 735 736 while (hi >= lo) { 737 if (g == fileGroups.size()) { 738 group = new LinkedList<>(); 739 fileGroups.add(group); 740 } else { 741 group = fileGroups.get(g); 742 } 743 744 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 745 746 // add the hi one 747 sizeGroups[g] += fileInfo.getSecond(); 748 group.add(fileInfo); 749 750 // change direction when at the end or the beginning 751 g += dir; 752 if (g == ngroups) { 753 dir = -1; 754 g = ngroups - 1; 755 } else if (g < 0) { 756 dir = 1; 757 g = 0; 758 } 759 } 760 761 if (LOG.isDebugEnabled()) { 762 for (int i = 0; i < sizeGroups.length; ++i) { 763 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); 764 } 765 } 766 767 return fileGroups; 768 } 769 770 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 771 @Override 772 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 773 TaskAttemptContext tac) throws IOException, InterruptedException { 774 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 775 } 776 777 @Override 778 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 779 Configuration conf = context.getConfiguration(); 780 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 781 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 782 783 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 784 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 785 if (mappers == 0 && snapshotFiles.size() > 0) { 786 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 787 mappers = Math.min(mappers, snapshotFiles.size()); 788 conf.setInt(CONF_NUM_SPLITS, mappers); 789 conf.setInt(MR_NUM_MAPS, mappers); 790 } 791 792 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 793 List<InputSplit> splits = new ArrayList(groups.size()); 794 for (List<Pair<SnapshotFileInfo, Long>> files : groups) { 795 splits.add(new ExportSnapshotInputSplit(files)); 796 } 797 return splits; 798 } 799 800 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 801 private List<Pair<BytesWritable, Long>> files; 802 private long length; 803 804 public ExportSnapshotInputSplit() { 805 this.files = null; 806 } 807 808 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 809 this.files = new ArrayList(snapshotFiles.size()); 810 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 811 this.files.add( 812 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 813 this.length += fileInfo.getSecond(); 814 } 815 } 816 817 private List<Pair<BytesWritable, Long>> getSplitKeys() { 818 return files; 819 } 820 821 @Override 822 public long getLength() throws IOException, InterruptedException { 823 return length; 824 } 825 826 @Override 827 public String[] getLocations() throws IOException, InterruptedException { 828 return new String[] {}; 829 } 830 831 @Override 832 public void readFields(DataInput in) throws IOException { 833 int count = in.readInt(); 834 files = new ArrayList<>(count); 835 length = 0; 836 for (int i = 0; i < count; ++i) { 837 BytesWritable fileInfo = new BytesWritable(); 838 fileInfo.readFields(in); 839 long size = in.readLong(); 840 files.add(new Pair<>(fileInfo, size)); 841 length += size; 842 } 843 } 844 845 @Override 846 public void write(DataOutput out) throws IOException { 847 out.writeInt(files.size()); 848 for (final Pair<BytesWritable, Long> fileInfo : files) { 849 fileInfo.getFirst().write(out); 850 out.writeLong(fileInfo.getSecond()); 851 } 852 } 853 } 854 855 private static class ExportSnapshotRecordReader 856 extends RecordReader<BytesWritable, NullWritable> { 857 private final List<Pair<BytesWritable, Long>> files; 858 private long totalSize = 0; 859 private long procSize = 0; 860 private int index = -1; 861 862 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 863 this.files = files; 864 for (Pair<BytesWritable, Long> fileInfo : files) { 865 totalSize += fileInfo.getSecond(); 866 } 867 } 868 869 @Override 870 public void close() { 871 } 872 873 @Override 874 public BytesWritable getCurrentKey() { 875 return files.get(index).getFirst(); 876 } 877 878 @Override 879 public NullWritable getCurrentValue() { 880 return NullWritable.get(); 881 } 882 883 @Override 884 public float getProgress() { 885 return (float) procSize / totalSize; 886 } 887 888 @Override 889 public void initialize(InputSplit split, TaskAttemptContext tac) { 890 } 891 892 @Override 893 public boolean nextKeyValue() { 894 if (index >= 0) { 895 procSize += files.get(index).getSecond(); 896 } 897 return (++index < files.size()); 898 } 899 } 900 } 901 902 // ========================================================================== 903 // Tool 904 // ========================================================================== 905 906 /** 907 * Run Map-Reduce Job to perform the files copy. 908 */ 909 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 910 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 911 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) 912 throws IOException, InterruptedException, ClassNotFoundException { 913 Configuration conf = getConf(); 914 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 915 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 916 if (mappers > 0) { 917 conf.setInt(CONF_NUM_SPLITS, mappers); 918 conf.setInt(MR_NUM_MAPS, mappers); 919 } 920 conf.setInt(CONF_FILES_MODE, filesMode); 921 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 922 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 923 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 924 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 925 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 926 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 927 928 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 929 Job job = new Job(conf); 930 job.setJobName(jobname); 931 job.setJarByClass(ExportSnapshot.class); 932 TableMapReduceUtil.addDependencyJars(job); 933 job.setMapperClass(ExportMapper.class); 934 job.setInputFormatClass(ExportSnapshotInputFormat.class); 935 job.setOutputFormatClass(NullOutputFormat.class); 936 job.setMapSpeculativeExecution(false); 937 job.setNumReduceTasks(0); 938 939 // Acquire the delegation Tokens 940 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 941 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 942 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 943 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 944 945 // Run the MR Job 946 if (!job.waitForCompletion(true)) { 947 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 948 } 949 } 950 951 private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf, 952 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { 953 // Update the conf with the current root dir, since may be a different cluster 954 Configuration conf = new Configuration(baseConf); 955 CommonFSUtils.setRootDir(conf, rootDir); 956 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 957 boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), 958 snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()); 959 if (isExpired) { 960 throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc)); 961 } 962 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 963 } 964 965 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 966 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 967 ExecutorService pool = Executors 968 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 969 List<Future<Void>> futures = new ArrayList<>(); 970 for (Path dstPath : traversedPath) { 971 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 972 futures.add(future); 973 } 974 try { 975 for (Future<Void> future : futures) { 976 future.get(); 977 } 978 } catch (InterruptedException | ExecutionException e) { 979 throw new IOException(e); 980 } finally { 981 pool.shutdownNow(); 982 } 983 } 984 985 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 986 Configuration conf, List<Path> traversedPath) throws IOException { 987 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 988 try { 989 fs.setOwner(path, filesUser, filesGroup); 990 } catch (IOException e) { 991 throw new RuntimeException( 992 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 993 } 994 }, conf); 995 } 996 997 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 998 final List<Path> traversedPath, final Configuration conf) throws IOException { 999 if (filesMode <= 0) { 1000 return; 1001 } 1002 FsPermission perm = new FsPermission(filesMode); 1003 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 1004 try { 1005 fs.setPermission(path, perm); 1006 } catch (IOException e) { 1007 throw new RuntimeException( 1008 "set permission for file " + path + " to " + filesMode + " failed", e); 1009 } 1010 }, conf); 1011 } 1012 1013 private boolean verifyTarget = true; 1014 private boolean verifySource = true; 1015 private boolean verifyChecksum = true; 1016 private String snapshotName = null; 1017 private String targetName = null; 1018 private boolean overwrite = false; 1019 private String filesGroup = null; 1020 private String filesUser = null; 1021 private Path outputRoot = null; 1022 private Path inputRoot = null; 1023 private int bandwidthMB = Integer.MAX_VALUE; 1024 private int filesMode = 0; 1025 private int mappers = 0; 1026 private boolean resetTtl = false; 1027 1028 @Override 1029 protected void processOptions(CommandLine cmd) { 1030 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 1031 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 1032 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 1033 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 1034 } 1035 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 1036 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 1037 } 1038 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 1039 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 1040 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 1041 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 1042 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 1043 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 1044 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 1045 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 1046 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 1047 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 1048 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 1049 } 1050 1051 /** 1052 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 1053 * @return 0 on success, and != 0 upon failure. 1054 */ 1055 @Override 1056 public int doWork() throws IOException { 1057 Configuration conf = getConf(); 1058 1059 // Check user options 1060 if (snapshotName == null) { 1061 System.err.println("Snapshot name not provided."); 1062 LOG.error("Use -h or --help for usage instructions."); 1063 return EXIT_FAILURE; 1064 } 1065 1066 if (outputRoot == null) { 1067 System.err 1068 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 1069 LOG.error("Use -h or --help for usage instructions."); 1070 return EXIT_FAILURE; 1071 } 1072 1073 if (targetName == null) { 1074 targetName = snapshotName; 1075 } 1076 if (inputRoot == null) { 1077 inputRoot = CommonFSUtils.getRootDir(conf); 1078 } else { 1079 CommonFSUtils.setRootDir(conf, inputRoot); 1080 } 1081 1082 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 1083 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 1084 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 1085 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 1086 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 1087 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 1088 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 1089 Path snapshotTmpDir = 1090 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 1091 Path outputSnapshotDir = 1092 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 1093 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 1094 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 1095 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 1096 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 1097 1098 // throw CorruptedSnapshotException if we can't read the snapshot info. 1099 SnapshotDescription sourceSnapshotDesc = 1100 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir); 1101 1102 // Verify snapshot source before copying files 1103 if (verifySource) { 1104 LOG.info("Verify the source snapshot's expiration status and integrity."); 1105 verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir); 1106 } 1107 1108 // Find the necessary directory which need to change owner and group 1109 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1110 if (outputFs.exists(needSetOwnerDir)) { 1111 if (skipTmp) { 1112 needSetOwnerDir = outputSnapshotDir; 1113 } else { 1114 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1115 if (outputFs.exists(needSetOwnerDir)) { 1116 needSetOwnerDir = snapshotTmpDir; 1117 } 1118 } 1119 } 1120 1121 // Check if the snapshot already exists 1122 if (outputFs.exists(outputSnapshotDir)) { 1123 if (overwrite) { 1124 if (!outputFs.delete(outputSnapshotDir, true)) { 1125 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1126 return EXIT_FAILURE; 1127 } 1128 } else { 1129 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1130 + outputSnapshotDir); 1131 return EXIT_FAILURE; 1132 } 1133 } 1134 1135 if (!skipTmp) { 1136 // Check if the snapshot already in-progress 1137 if (outputFs.exists(snapshotTmpDir)) { 1138 if (overwrite) { 1139 if (!outputFs.delete(snapshotTmpDir, true)) { 1140 System.err 1141 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1142 return EXIT_FAILURE; 1143 } 1144 } else { 1145 System.err 1146 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1147 System.err 1148 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1149 System.err 1150 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1151 return EXIT_FAILURE; 1152 } 1153 } 1154 } 1155 1156 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1157 // The snapshot references must be copied before the hfiles otherwise the cleaner 1158 // will remove them because they are unreferenced. 1159 List<Path> travesedPaths = new ArrayList<>(); 1160 boolean copySucceeded = false; 1161 try { 1162 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1163 travesedPaths = 1164 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1165 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1166 copySucceeded = true; 1167 } catch (IOException e) { 1168 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1169 + " to=" + initialOutputSnapshotDir, e); 1170 } finally { 1171 if (copySucceeded) { 1172 if (filesUser != null || filesGroup != null) { 1173 LOG.warn( 1174 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1175 + (filesGroup == null 1176 ? "" 1177 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1178 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1179 } 1180 if (filesMode > 0) { 1181 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1182 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1183 } 1184 } 1185 } 1186 1187 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1188 // reset TTL for target snapshot. 1189 if (!targetName.equals(snapshotName) || resetTtl) { 1190 SnapshotDescription.Builder snapshotDescBuilder = 1191 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1192 if (!targetName.equals(snapshotName)) { 1193 snapshotDescBuilder.setName(targetName); 1194 } 1195 if (resetTtl) { 1196 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1197 } 1198 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1199 initialOutputSnapshotDir, outputFs); 1200 if (filesUser != null || filesGroup != null) { 1201 outputFs.setOwner( 1202 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1203 filesGroup); 1204 } 1205 if (filesMode > 0) { 1206 outputFs.setPermission( 1207 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1208 new FsPermission((short) filesMode)); 1209 } 1210 } 1211 1212 // Step 2 - Start MR Job to copy files 1213 // The snapshot references must be copied before the files otherwise the files gets removed 1214 // by the HFileArchiver, since they have no references. 1215 try { 1216 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1217 filesGroup, filesMode, mappers, bandwidthMB); 1218 1219 LOG.info("Finalize the Snapshot Export"); 1220 if (!skipTmp) { 1221 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1222 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1223 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1224 + snapshotTmpDir + " to=" + outputSnapshotDir); 1225 } 1226 } 1227 1228 // Step 4 - Verify snapshot integrity 1229 if (verifyTarget) { 1230 LOG.info("Verify the exported snapshot's expiration status and integrity."); 1231 SnapshotDescription targetSnapshotDesc = 1232 SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir); 1233 verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir); 1234 } 1235 1236 LOG.info("Export Completed: " + targetName); 1237 return EXIT_SUCCESS; 1238 } catch (Exception e) { 1239 LOG.error("Snapshot export failed", e); 1240 if (!skipTmp) { 1241 outputFs.delete(snapshotTmpDir, true); 1242 } 1243 outputFs.delete(outputSnapshotDir, true); 1244 return EXIT_FAILURE; 1245 } 1246 } 1247 1248 @Override 1249 protected void printUsage() { 1250 super.printUsage(); 1251 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1252 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1253 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1254 + " hbase snapshot export \\\n" 1255 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1256 + " --copy-to hdfs://srv1:50070/hbase"); 1257 } 1258 1259 @Override 1260 protected void addOptions() { 1261 addRequiredOption(Options.SNAPSHOT); 1262 addOption(Options.COPY_TO); 1263 addOption(Options.COPY_FROM); 1264 addOption(Options.TARGET_NAME); 1265 addOption(Options.NO_CHECKSUM_VERIFY); 1266 addOption(Options.NO_TARGET_VERIFY); 1267 addOption(Options.NO_SOURCE_VERIFY); 1268 addOption(Options.OVERWRITE); 1269 addOption(Options.CHUSER); 1270 addOption(Options.CHGROUP); 1271 addOption(Options.CHMOD); 1272 addOption(Options.MAPPERS); 1273 addOption(Options.BANDWIDTH); 1274 addOption(Options.RESET_TTL); 1275 } 1276 1277 public static void main(String[] args) { 1278 new ExportSnapshot().doStaticMain(args); 1279 } 1280}