001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce.replication; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.SynchronousQueue; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; 027import java.util.concurrent.TimeUnit; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.conf.Configured; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.ResultScanner; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.TableSnapshotScanner; 044import org.apache.hadoop.hbase.filter.Filter; 045import org.apache.hadoop.hbase.filter.FilterList; 046import org.apache.hadoop.hbase.filter.PrefixFilter; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 049import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 050import org.apache.hadoop.hbase.mapreduce.TableMapper; 051import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; 052import org.apache.hadoop.hbase.mapreduce.TableSplit; 053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters; 054import org.apache.hadoop.hbase.replication.ReplicationException; 055import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 056import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 057import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 058import org.apache.hadoop.hbase.replication.ReplicationUtils; 059import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.hadoop.hbase.util.Pair; 063import org.apache.hadoop.hbase.zookeeper.ZKConfig; 064import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 065import org.apache.hadoop.mapreduce.InputSplit; 066import org.apache.hadoop.mapreduce.Job; 067import org.apache.hadoop.mapreduce.MRJobConfig; 068import org.apache.hadoop.mapreduce.Mapper; 069import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 070import org.apache.hadoop.util.Tool; 071import org.apache.hadoop.util.ToolRunner; 072import org.apache.yetus.audience.InterfaceAudience; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076/** 077 * This map-only job compares the data from a local table with a remote one. Every cell is compared 078 * and must have exactly the same keys (even timestamp) as well as same value. It is possible to 079 * restrict the job by time range and families. The peer id that's provided must match the one given 080 * when the replication stream was setup. 081 * <p> 082 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason for a why a row is 083 * different is shown in the map's log. 084 */ 085@InterfaceAudience.Private 086public class VerifyReplication extends Configured implements Tool { 087 088 private static final Logger LOG = LoggerFactory.getLogger(VerifyReplication.class); 089 090 public final static String NAME = "verifyrep"; 091 private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; 092 private static ThreadPoolExecutor reCompareExecutor = null; 093 int reCompareTries = 0; 094 int reCompareBackoffExponent = 0; 095 int reCompareThreads = 0; 096 int sleepMsBeforeReCompare = 0; 097 long startTime = 0; 098 long endTime = Long.MAX_VALUE; 099 int batch = -1; 100 int versions = -1; 101 String tableName = null; 102 String families = null; 103 String delimiter = ""; 104 String peerId = null; 105 String peerQuorumAddress = null; 106 String rowPrefixes = null; 107 boolean verbose = false; 108 boolean includeDeletedCells = false; 109 // Source table snapshot name 110 String sourceSnapshotName = null; 111 // Temp location in source cluster to restore source snapshot 112 String sourceSnapshotTmpDir = null; 113 // Peer table snapshot name 114 String peerSnapshotName = null; 115 // Temp location in peer cluster to restore peer snapshot 116 String peerSnapshotTmpDir = null; 117 // Peer cluster Hadoop FS address 118 String peerFSAddress = null; 119 // Peer cluster HBase root dir location 120 String peerHBaseRootAddress = null; 121 // Peer Table Name 122 String peerTableName = null; 123 124 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 125 126 /** 127 * Map-only comparator for 2 tables 128 */ 129 public static class Verifier extends TableMapper<ImmutableBytesWritable, Put> { 130 131 public enum Counters { 132 GOODROWS, 133 BADROWS, 134 ONLY_IN_SOURCE_TABLE_ROWS, 135 ONLY_IN_PEER_TABLE_ROWS, 136 CONTENT_DIFFERENT_ROWS, 137 RECOMPARES, 138 MAIN_THREAD_RECOMPARES, 139 SOURCE_ROW_CHANGED, 140 PEER_ROW_CHANGED, 141 FAILED_RECOMPARE 142 } 143 144 private Connection sourceConnection; 145 private Table sourceTable; 146 private Connection replicatedConnection; 147 private Table replicatedTable; 148 private ResultScanner replicatedScanner; 149 private Result currentCompareRowInPeerTable; 150 private Scan tableScan; 151 private int reCompareTries; 152 private int reCompareBackoffExponent; 153 private int sleepMsBeforeReCompare; 154 private String delimiter = ""; 155 private boolean verbose = false; 156 private int batch = -1; 157 158 /** 159 * Map method that compares every scanned row with the equivalent from a distant cluster. 160 * @param row The current table row key. 161 * @param value The columns. 162 * @param context The current context. 163 * @throws IOException When something is broken with the data. 164 */ 165 @Override 166 public void map(ImmutableBytesWritable row, final Result value, Context context) 167 throws IOException { 168 if (replicatedScanner == null) { 169 Configuration conf = context.getConfiguration(); 170 reCompareTries = conf.getInt(NAME + ".recompareTries", 0); 171 reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1); 172 sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0); 173 if (sleepMsBeforeReCompare > 0) { 174 reCompareTries = Math.max(reCompareTries, 1); 175 } 176 delimiter = conf.get(NAME + ".delimiter", ""); 177 verbose = conf.getBoolean(NAME + ".verbose", false); 178 batch = conf.getInt(NAME + ".batch", -1); 179 final Scan scan = new Scan(); 180 if (batch > 0) { 181 scan.setBatch(batch); 182 } 183 scan.setCacheBlocks(false); 184 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); 185 long startTime = conf.getLong(NAME + ".startTime", 0); 186 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); 187 String families = conf.get(NAME + ".families", null); 188 if (families != null) { 189 String[] fams = families.split(","); 190 for (String fam : fams) { 191 scan.addFamily(Bytes.toBytes(fam)); 192 } 193 } 194 boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); 195 scan.setRaw(includeDeletedCells); 196 String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); 197 setRowPrefixFilter(scan, rowPrefixes); 198 scan.setTimeRange(startTime, endTime); 199 int versions = conf.getInt(NAME + ".versions", -1); 200 LOG.info("Setting number of version inside map as: " + versions); 201 if (versions >= 0) { 202 scan.setMaxVersions(versions); 203 } 204 int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0); 205 reCompareExecutor = buildReCompareExecutor(reCompareThreads, context); 206 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); 207 sourceConnection = ConnectionFactory.createConnection(conf); 208 sourceTable = sourceConnection.getTable(tableName); 209 tableScan = scan; 210 211 final InputSplit tableSplit = context.getInputSplit(); 212 213 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); 214 Configuration peerConf = 215 HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX); 216 217 String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString()); 218 TableName peerTableName = TableName.valueOf(peerName); 219 replicatedConnection = ConnectionFactory.createConnection(peerConf); 220 replicatedTable = replicatedConnection.getTable(peerTableName); 221 scan.setStartRow(value.getRow()); 222 223 byte[] endRow = null; 224 if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { 225 endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo() 226 .getEndKey(); 227 } else { 228 endRow = ((TableSplit) tableSplit).getEndRow(); 229 } 230 231 scan.setStopRow(endRow); 232 233 String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); 234 if (peerSnapshotName != null) { 235 String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); 236 String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); 237 String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); 238 FileSystem.setDefaultUri(peerConf, peerFSAddress); 239 CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); 240 LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" 241 + peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf) 242 + " peerFSAddress:" + peerFSAddress); 243 244 replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf), 245 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); 246 } else { 247 replicatedScanner = replicatedTable.getScanner(scan); 248 } 249 currentCompareRowInPeerTable = replicatedScanner.next(); 250 } 251 while (true) { 252 if (currentCompareRowInPeerTable == null) { 253 // reach the region end of peer table, row only in source table 254 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); 255 break; 256 } 257 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); 258 if (rowCmpRet == 0) { 259 // rowkey is same, need to compare the content of the row 260 try { 261 Result.compareResults(value, currentCompareRowInPeerTable, false); 262 context.getCounter(Counters.GOODROWS).increment(1); 263 if (verbose) { 264 LOG.info( 265 "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter); 266 } 267 } catch (Exception e) { 268 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value, 269 currentCompareRowInPeerTable); 270 } 271 currentCompareRowInPeerTable = replicatedScanner.next(); 272 break; 273 } else if (rowCmpRet < 0) { 274 // row only exists in source table 275 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); 276 break; 277 } else { 278 // row only exists in peer table 279 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, 280 currentCompareRowInPeerTable); 281 currentCompareRowInPeerTable = replicatedScanner.next(); 282 } 283 } 284 } 285 286 @SuppressWarnings("FutureReturnValueIgnored") 287 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row, 288 Result replicatedRow) { 289 byte[] rowKey = getRow(row, replicatedRow); 290 if (reCompareTries == 0) { 291 context.getCounter(counter).increment(1); 292 context.getCounter(Counters.BADROWS).increment(1); 293 LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter); 294 return; 295 } 296 297 VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, 298 row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, 299 reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); 300 301 if (reCompareExecutor == null) { 302 runnable.run(); 303 return; 304 } 305 306 reCompareExecutor.submit(runnable); 307 } 308 309 @Override 310 protected void cleanup(Context context) { 311 if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) { 312 reCompareExecutor.shutdown(); 313 try { 314 boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES); 315 if (!terminated) { 316 List<Runnable> queue = reCompareExecutor.shutdownNow(); 317 for (Runnable runnable : queue) { 318 ((VerifyReplicationRecompareRunnable) runnable).fail(); 319 } 320 321 terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES); 322 323 if (!terminated) { 324 int activeCount = Math.max(1, reCompareExecutor.getActiveCount()); 325 LOG.warn("Found {} possible recompares still running in the executable" 326 + " incrementing BADROWS and FAILED_RECOMPARE", activeCount); 327 context.getCounter(Counters.BADROWS).increment(activeCount); 328 context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount); 329 } 330 } 331 } catch (InterruptedException e) { 332 throw new RuntimeException("Failed to await executor termination in cleanup", e); 333 } 334 } 335 if (replicatedScanner != null) { 336 try { 337 while (currentCompareRowInPeerTable != null) { 338 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, 339 currentCompareRowInPeerTable); 340 currentCompareRowInPeerTable = replicatedScanner.next(); 341 } 342 } catch (Exception e) { 343 LOG.error("fail to scan peer table in cleanup", e); 344 } finally { 345 replicatedScanner.close(); 346 replicatedScanner = null; 347 } 348 } 349 350 if (sourceTable != null) { 351 try { 352 sourceTable.close(); 353 } catch (IOException e) { 354 LOG.error("fail to close source table in cleanup", e); 355 } 356 } 357 if (sourceConnection != null) { 358 try { 359 sourceConnection.close(); 360 } catch (Exception e) { 361 LOG.error("fail to close source connection in cleanup", e); 362 } 363 } 364 365 if (replicatedTable != null) { 366 try { 367 replicatedTable.close(); 368 } catch (Exception e) { 369 LOG.error("fail to close replicated table in cleanup", e); 370 } 371 } 372 if (replicatedConnection != null) { 373 try { 374 replicatedConnection.close(); 375 } catch (Exception e) { 376 LOG.error("fail to close replicated connection in cleanup", e); 377 } 378 } 379 } 380 } 381 382 private static Pair<ReplicationPeerConfig, Configuration> 383 getPeerQuorumConfig(final Configuration conf, String peerId) throws IOException { 384 ZKWatcher localZKW = null; 385 try { 386 localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() { 387 @Override 388 public void abort(String why, Throwable e) { 389 } 390 391 @Override 392 public boolean isAborted() { 393 return false; 394 } 395 }); 396 ReplicationPeerStorage storage = 397 ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf); 398 ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); 399 return Pair.newPair(peerConfig, 400 ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf)); 401 } catch (ReplicationException e) { 402 throw new IOException("An error occurred while trying to connect to the remote peer cluster", 403 e); 404 } finally { 405 if (localZKW != null) { 406 localZKW.close(); 407 } 408 } 409 } 410 411 private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) 412 throws IOException { 413 Configuration peerConf = 414 HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 415 FileSystem.setDefaultUri(peerConf, peerFSAddress); 416 CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); 417 FileSystem fs = FileSystem.get(peerConf); 418 RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf), 419 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName); 420 } 421 422 /** 423 * Sets up the actual job. 424 * @param conf The current configuration. 425 * @param args The command line parameters. 426 * @return The newly created job. 427 * @throws java.io.IOException When setting up the job fails. 428 */ 429 public Job createSubmittableJob(Configuration conf, String[] args) throws IOException { 430 if (!doCommandLine(args)) { 431 return null; 432 } 433 conf.set(NAME + ".tableName", tableName); 434 conf.setLong(NAME + ".startTime", startTime); 435 conf.setLong(NAME + ".endTime", endTime); 436 conf.setInt(NAME + ".sleepMsBeforeReCompare", sleepMsBeforeReCompare); 437 conf.set(NAME + ".delimiter", delimiter); 438 conf.setInt(NAME + ".batch", batch); 439 conf.setBoolean(NAME + ".verbose", verbose); 440 conf.setBoolean(NAME + ".includeDeletedCells", includeDeletedCells); 441 if (families != null) { 442 conf.set(NAME + ".families", families); 443 } 444 if (rowPrefixes != null) { 445 conf.set(NAME + ".rowPrefixes", rowPrefixes); 446 } 447 448 String peerQuorumAddress; 449 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null; 450 if (peerId != null) { 451 peerConfigPair = getPeerQuorumConfig(conf, peerId); 452 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); 453 peerQuorumAddress = peerConfig.getClusterKey(); 454 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " 455 + peerConfig.getConfiguration()); 456 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 457 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, 458 peerConfig.getConfiguration().entrySet()); 459 } else { 460 assert this.peerQuorumAddress != null; 461 peerQuorumAddress = this.peerQuorumAddress; 462 LOG.info("Peer Quorum Address: " + peerQuorumAddress); 463 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 464 } 465 466 if (peerTableName != null) { 467 LOG.info("Peer Table Name: " + peerTableName); 468 conf.set(NAME + ".peerTableName", peerTableName); 469 } 470 471 conf.setInt(NAME + ".versions", versions); 472 LOG.info("Number of version: " + versions); 473 474 conf.setInt(NAME + ".recompareTries", reCompareTries); 475 conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent); 476 conf.setInt(NAME + ".recompareThreads", reCompareThreads); 477 478 // Set Snapshot specific parameters 479 if (peerSnapshotName != null) { 480 conf.set(NAME + ".peerSnapshotName", peerSnapshotName); 481 482 // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to 483 // restore snapshot. 484 Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString()); 485 peerSnapshotTmpDir = restoreDir.toString(); 486 conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); 487 488 conf.set(NAME + ".peerFSAddress", peerFSAddress); 489 conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); 490 491 // This is to create HDFS delegation token for peer cluster in case of secured 492 conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR)); 493 } 494 495 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 496 job.setJarByClass(VerifyReplication.class); 497 498 Scan scan = new Scan(); 499 scan.setTimeRange(startTime, endTime); 500 scan.setRaw(includeDeletedCells); 501 scan.setCacheBlocks(false); 502 if (batch > 0) { 503 scan.setBatch(batch); 504 } 505 if (versions >= 0) { 506 scan.setMaxVersions(versions); 507 LOG.info("Number of versions set to " + versions); 508 } 509 if (families != null) { 510 String[] fams = families.split(","); 511 for (String fam : fams) { 512 scan.addFamily(Bytes.toBytes(fam)); 513 } 514 } 515 516 setRowPrefixFilter(scan, rowPrefixes); 517 518 if (sourceSnapshotName != null) { 519 Path snapshotTempPath = new Path(sourceSnapshotTmpDir); 520 LOG.info( 521 "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); 522 TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, 523 null, job, true, snapshotTempPath); 524 restoreSnapshotForPeerCluster(conf, peerQuorumAddress); 525 } else { 526 TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); 527 } 528 529 Configuration peerClusterConf; 530 if (peerId != null) { 531 assert peerConfigPair != null; 532 peerClusterConf = peerConfigPair.getSecond(); 533 } else { 534 peerClusterConf = 535 HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 536 } 537 // Obtain the auth token from peer cluster 538 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); 539 540 job.setOutputFormatClass(NullOutputFormat.class); 541 job.setNumReduceTasks(0); 542 return job; 543 } 544 545 protected static byte[] getRow(Result sourceResult, Result replicatedResult) { 546 if (sourceResult != null) { 547 return sourceResult.getRow(); 548 } else if (replicatedResult != null) { 549 return replicatedResult.getRow(); 550 } 551 throw new RuntimeException("Both sourceResult and replicatedResult are null!"); 552 } 553 554 private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { 555 if (rowPrefixes != null && !rowPrefixes.isEmpty()) { 556 String[] rowPrefixArray = rowPrefixes.split(","); 557 Arrays.sort(rowPrefixArray); 558 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); 559 for (String prefix : rowPrefixArray) { 560 Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); 561 filterList.addFilter(filter); 562 } 563 scan.setFilter(filterList); 564 byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); 565 byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length - 1]); 566 setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); 567 } 568 } 569 570 private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { 571 scan.setStartRow(startPrefixRow); 572 byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), 573 new byte[] { (byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1) }); 574 scan.setStopRow(stopRow); 575 } 576 577 public boolean doCommandLine(final String[] args) { 578 if (args.length < 2) { 579 printUsage(null); 580 return false; 581 } 582 try { 583 for (int i = 0; i < args.length; i++) { 584 String cmd = args[i]; 585 if (cmd.equals("-h") || cmd.startsWith("--h")) { 586 printUsage(null); 587 return false; 588 } 589 590 final String startTimeArgKey = "--starttime="; 591 if (cmd.startsWith(startTimeArgKey)) { 592 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 593 continue; 594 } 595 596 final String endTimeArgKey = "--endtime="; 597 if (cmd.startsWith(endTimeArgKey)) { 598 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 599 continue; 600 } 601 602 final String includeDeletedCellsArgKey = "--raw"; 603 if (cmd.equals(includeDeletedCellsArgKey)) { 604 includeDeletedCells = true; 605 continue; 606 } 607 608 final String versionsArgKey = "--versions="; 609 if (cmd.startsWith(versionsArgKey)) { 610 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 611 continue; 612 } 613 614 final String batchArgKey = "--batch="; 615 if (cmd.startsWith(batchArgKey)) { 616 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 617 continue; 618 } 619 620 final String familiesArgKey = "--families="; 621 if (cmd.startsWith(familiesArgKey)) { 622 families = cmd.substring(familiesArgKey.length()); 623 continue; 624 } 625 626 final String rowPrefixesKey = "--row-prefixes="; 627 if (cmd.startsWith(rowPrefixesKey)) { 628 rowPrefixes = cmd.substring(rowPrefixesKey.length()); 629 continue; 630 } 631 632 final String delimiterArgKey = "--delimiter="; 633 if (cmd.startsWith(delimiterArgKey)) { 634 delimiter = cmd.substring(delimiterArgKey.length()); 635 continue; 636 } 637 638 final String deprecatedSleepToReCompareKey = "--recomparesleep="; 639 final String sleepToReCompareKey = "--recompareSleep="; 640 if (cmd.startsWith(deprecatedSleepToReCompareKey)) { 641 LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0." 642 + " Use --recompareSleep instead."); 643 sleepMsBeforeReCompare = 644 Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length())); 645 continue; 646 } 647 if (cmd.startsWith(sleepToReCompareKey)) { 648 sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); 649 continue; 650 } 651 652 final String verboseKey = "--verbose"; 653 if (cmd.startsWith(verboseKey)) { 654 verbose = true; 655 continue; 656 } 657 658 final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; 659 if (cmd.startsWith(sourceSnapshotNameArgKey)) { 660 sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); 661 continue; 662 } 663 664 final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; 665 if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { 666 sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); 667 continue; 668 } 669 670 final String peerSnapshotNameArgKey = "--peerSnapshotName="; 671 if (cmd.startsWith(peerSnapshotNameArgKey)) { 672 peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); 673 continue; 674 } 675 676 final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; 677 if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { 678 peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); 679 continue; 680 } 681 682 final String peerFSAddressArgKey = "--peerFSAddress="; 683 if (cmd.startsWith(peerFSAddressArgKey)) { 684 peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); 685 continue; 686 } 687 688 final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; 689 if (cmd.startsWith(peerHBaseRootAddressArgKey)) { 690 peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); 691 continue; 692 } 693 694 final String peerTableNameArgKey = "--peerTableName="; 695 if (cmd.startsWith(peerTableNameArgKey)) { 696 peerTableName = cmd.substring(peerTableNameArgKey.length()); 697 continue; 698 } 699 700 final String reCompareThreadArgs = "--recompareThreads="; 701 if (cmd.startsWith(reCompareThreadArgs)) { 702 reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length())); 703 continue; 704 } 705 706 final String reCompareTriesKey = "--recompareTries="; 707 if (cmd.startsWith(reCompareTriesKey)) { 708 reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length())); 709 continue; 710 } 711 712 final String reCompareBackoffExponentKey = "--recompareBackoffExponent="; 713 if (cmd.startsWith(reCompareBackoffExponentKey)) { 714 reCompareBackoffExponent = 715 Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length())); 716 continue; 717 } 718 719 if (cmd.startsWith("--")) { 720 printUsage("Invalid argument '" + cmd + "'"); 721 return false; 722 } 723 724 if (i == args.length - 2) { 725 if (isPeerQuorumAddress(cmd)) { 726 peerQuorumAddress = cmd; 727 } else { 728 peerId = cmd; 729 } 730 } 731 732 if (i == args.length - 1) { 733 tableName = cmd; 734 } 735 } 736 737 if ( 738 (sourceSnapshotName != null && sourceSnapshotTmpDir == null) 739 || (sourceSnapshotName == null && sourceSnapshotTmpDir != null) 740 ) { 741 printUsage("Source snapshot name and snapshot temp location should be provided" 742 + " to use snapshots in source cluster"); 743 return false; 744 } 745 746 if ( 747 peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null 748 || peerHBaseRootAddress != null 749 ) { 750 if ( 751 peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null 752 || peerHBaseRootAddress == null 753 ) { 754 printUsage( 755 "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " 756 + "peer FSAddress should be provided to use snapshots in peer cluster"); 757 return false; 758 } 759 } 760 761 // This is to avoid making recompare calls to source/peer tables when snapshots are used 762 if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { 763 printUsage( 764 "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are" 765 + " immutable"); 766 return false; 767 } 768 769 } catch (Exception e) { 770 e.printStackTrace(); 771 printUsage("Can't start because " + e.getMessage()); 772 return false; 773 } 774 return true; 775 } 776 777 private boolean isPeerQuorumAddress(String cmd) { 778 try { 779 ZKConfig.validateClusterKey(cmd); 780 } catch (IOException e) { 781 // not a quorum address 782 return false; 783 } 784 return true; 785 } 786 787 /* 788 * @param errorMsg Error message. Can be null. 789 */ 790 private static void printUsage(final String errorMsg) { 791 if (errorMsg != null && errorMsg.length() > 0) { 792 System.err.println("ERROR: " + errorMsg); 793 } 794 System.err.println("Usage: verifyrep [--starttime=X]" 795 + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] " 796 + "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]" 797 + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " 798 + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " 799 + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>"); 800 System.err.println(); 801 System.err.println("Options:"); 802 System.err.println(" starttime beginning of the time range"); 803 System.err.println(" without endtime means from starttime to forever"); 804 System.err.println(" endtime end of the time range"); 805 System.err.println(" versions number of cell versions to verify"); 806 System.err.println(" batch batch count for scan, note that" 807 + " result row counts will no longer be actual number of rows when you use this option"); 808 System.err.println(" raw includes raw scan if given in options"); 809 System.err.println(" families comma-separated list of families to copy"); 810 System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); 811 System.err.println(" delimiter the delimiter used in display around rowkey"); 812 System.err.println(" recompareSleep milliseconds to sleep before recompare row, " 813 + "default value is 0 which disables the recompare."); 814 System.err.println(" recompareThreads number of threads to run recompares in"); 815 System.err.println(" recompareTries number of recompare attempts before incrementing " 816 + "the BADROWS counter. Defaults to 1 recompare"); 817 System.out.println(" recompareBackoffExponent exponential multiplier to increase " 818 + "recompareSleep after each recompare attempt, " 819 + "default value is 0 which results in a constant sleep time"); 820 System.err.println(" verbose logs row keys of good rows"); 821 System.err.println(" peerTableName Peer Table Name"); 822 System.err.println(" sourceSnapshotName Source Snapshot Name"); 823 System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); 824 System.err.println(" peerSnapshotName Peer Snapshot Name"); 825 System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); 826 System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); 827 System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); 828 System.err.println(); 829 System.err.println("Args:"); 830 System.err.println(" peerid Id of the peer used for verification," 831 + " must match the one given for replication"); 832 System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The " 833 + "format is zk_quorum:zk_port:zk_hbase_path"); 834 System.err.println(" tablename Name of the table to verify"); 835 System.err.println(); 836 System.err.println("Examples:"); 837 System.err 838 .println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); 839 System.err 840 .println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" 841 + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); 842 System.err.println(); 843 System.err.println( 844 " To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b"); 845 System.err.println(" Assume quorum address for cluster-b is" 846 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b"); 847 System.err 848 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 849 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 850 + "2181:/cluster-b \\\n" + " TestTable"); 851 System.err.println(); 852 System.err 853 .println(" To verify the data in TestTable between the secured cluster runs VerifyReplication" 854 + " and insecure cluster-b"); 855 System.err 856 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 857 + " -D verifyrep.peer.hbase.security.authentication=simple \\\n" 858 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 859 + "2181:/cluster-b \\\n" + " TestTable"); 860 System.err.println(); 861 System.err.println(" To verify the data in TestTable between" 862 + " the secured cluster runs VerifyReplication and secured cluster-b"); 863 System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E" 864 + ", for master and regionserver kerberos principal from another cluster"); 865 System.err 866 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 867 + " -D verifyrep.peer.hbase.regionserver.kerberos.principal=" 868 + "cluster-b/_HOST@EXAMPLE.COM \\\n" 869 + " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" 870 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 871 + "2181:/cluster-b \\\n" + " TestTable"); 872 System.err.println(); 873 System.err.println( 874 " To verify the data in TestTable between the insecure cluster runs VerifyReplication" 875 + " and secured cluster-b"); 876 System.err 877 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 878 + " -D verifyrep.peer.hbase.security.authentication=kerberos \\\n" 879 + " -D verifyrep.peer.hbase.regionserver.kerberos.principal=" 880 + "cluster-b/_HOST@EXAMPLE.COM \\\n" 881 + " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" 882 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 883 + "2181:/cluster-b \\\n" + " TestTable"); 884 } 885 886 private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) { 887 if (maxThreads == 0) { 888 return null; 889 } 890 891 return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), 892 buildRejectedReComparePolicy(context)); 893 } 894 895 private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) { 896 return new CallerRunsPolicy() { 897 @Override 898 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { 899 LOG.debug("Re-comparison execution rejected. Running in main thread."); 900 context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1); 901 // will run in the current thread 902 super.rejectedExecution(runnable, e); 903 } 904 }; 905 } 906 907 @Override 908 public int run(String[] args) throws Exception { 909 Configuration conf = this.getConf(); 910 Job job = createSubmittableJob(conf, args); 911 if (job != null) { 912 return job.waitForCompletion(true) ? 0 : 1; 913 } 914 return 1; 915 } 916 917 /** 918 * Main entry point. 919 * @param args The command line parameters. 920 * @throws Exception When running the job fails. 921 */ 922 public static void main(String[] args) throws Exception { 923 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); 924 System.exit(res); 925 } 926}