001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 054import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.ReplicationTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.mapreduce.Counters; 061import org.apache.hadoop.mapreduce.Job; 062import org.junit.AfterClass; 063import org.junit.Before; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Rule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.rules.TestName; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073@Category({ ReplicationTests.class, LargeTests.class }) 074public class TestVerifyReplication extends TestReplicationBase { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestVerifyReplication.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class); 081 082 private static final String PEER_ID = "2"; 083 private static final TableName peerTableName = TableName.valueOf("peerTest"); 084 private static Table htable3; 085 086 @Rule 087 public TestName name = new TestName(); 088 089 @Before 090 public void setUp() throws Exception { 091 cleanUp(); 092 UTIL2.deleteTableData(peerTableName); 093 } 094 095 @BeforeClass 096 public static void setUpBeforeClass() throws Exception { 097 TestReplicationBase.setUpBeforeClass(); 098 099 TableDescriptor peerTable = 100 TableDescriptorBuilder.newBuilder(peerTableName) 101 .setColumnFamily( 102 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build()) 103 .build(); 104 105 Connection connection2 = ConnectionFactory.createConnection(CONF2); 106 try (Admin admin2 = connection2.getAdmin()) { 107 admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 108 } 109 htable3 = connection2.getTable(peerTableName); 110 } 111 112 static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) 113 throws IOException, InterruptedException, ClassNotFoundException { 114 Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); 115 if (job == null) { 116 fail("Job wasn't created, see the log"); 117 } 118 if (!job.waitForCompletion(true)) { 119 fail("Job failed, see the log"); 120 } 121 assertEquals(expectedGoodRows, 122 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 123 assertEquals(expectedBadRows, 124 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 125 return job.getCounters(); 126 } 127 128 /** 129 * Do a small loading into a table, make sure the data is really the same, then run the 130 * VerifyReplication job to check the results. Do a second comparison where all the cells are 131 * different. 132 */ 133 @Test 134 public void testVerifyRepJob() throws Exception { 135 // Populate the tables, at the same time it guarantees that the tables are 136 // identical since it does the check 137 runSmallBatchTest(); 138 139 String[] args = new String[] { PEER_ID, tableName.getNameAsString() }; 140 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 141 142 Scan scan = new Scan(); 143 ResultScanner rs = htable2.getScanner(scan); 144 Put put = null; 145 for (Result result : rs) { 146 put = new Put(result.getRow()); 147 Cell firstVal = result.rawCells()[0]; 148 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 149 Bytes.toBytes("diff data")); 150 htable2.put(put); 151 } 152 Delete delete = new Delete(put.getRow()); 153 htable2.delete(delete); 154 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 155 } 156 157 /** 158 * Load a row into a table, make sure the data is really the same, delete the row, make sure the 159 * delete marker is replicated, run verify replication with and without raw to check the results. 160 */ 161 @Test 162 public void testVerifyRepJobWithRawOptions() throws Exception { 163 LOG.info(name.getMethodName()); 164 165 final TableName tableName = TableName.valueOf(name.getMethodName()); 166 byte[] familyname = Bytes.toBytes("fam_raw"); 167 byte[] row = Bytes.toBytes("row_raw"); 168 169 Table lHtable1 = null; 170 Table lHtable2 = null; 171 172 try { 173 ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) 174 .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); 175 TableDescriptor table = 176 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); 177 178 Connection connection1 = ConnectionFactory.createConnection(CONF1); 179 Connection connection2 = ConnectionFactory.createConnection(CONF2); 180 try (Admin admin1 = connection1.getAdmin()) { 181 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 182 } 183 try (Admin admin2 = connection2.getAdmin()) { 184 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 185 } 186 UTIL1.waitUntilAllRegionsAssigned(tableName); 187 UTIL2.waitUntilAllRegionsAssigned(tableName); 188 189 lHtable1 = UTIL1.getConnection().getTable(tableName); 190 lHtable2 = UTIL2.getConnection().getTable(tableName); 191 192 Put put = new Put(row); 193 put.addColumn(familyname, row, row); 194 lHtable1.put(put); 195 196 Get get = new Get(row); 197 for (int i = 0; i < NB_RETRIES; i++) { 198 if (i == NB_RETRIES - 1) { 199 fail("Waited too much time for put replication"); 200 } 201 Result res = lHtable2.get(get); 202 if (res.isEmpty()) { 203 LOG.info("Row not available"); 204 Thread.sleep(SLEEP_TIME); 205 } else { 206 assertArrayEquals(res.value(), row); 207 break; 208 } 209 } 210 211 Delete del = new Delete(row); 212 lHtable1.delete(del); 213 214 get = new Get(row); 215 for (int i = 0; i < NB_RETRIES; i++) { 216 if (i == NB_RETRIES - 1) { 217 fail("Waited too much time for del replication"); 218 } 219 Result res = lHtable2.get(get); 220 if (res.size() >= 1) { 221 LOG.info("Row not deleted"); 222 Thread.sleep(SLEEP_TIME); 223 } else { 224 break; 225 } 226 } 227 228 // Checking verifyReplication for the default behavior. 229 String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() }; 230 runVerifyReplication(argsWithoutRaw, 0, 0); 231 232 // Checking verifyReplication with raw 233 String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() }; 234 runVerifyReplication(argsWithRawAsTrue, 1, 0); 235 } finally { 236 if (lHtable1 != null) { 237 lHtable1.close(); 238 } 239 if (lHtable2 != null) { 240 lHtable2.close(); 241 } 242 } 243 } 244 245 static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) 246 throws IOException { 247 FileSystem fs = FileSystem.get(conf); 248 FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); 249 assertNotNull(subDirectories); 250 assertEquals(subDirectories.length, expectedCount); 251 for (int i = 0; i < expectedCount; i++) { 252 assertTrue(subDirectories[i].isDirectory()); 253 } 254 } 255 256 @Test 257 public void testVerifyRepJobWithQuorumAddress() throws Exception { 258 // Populate the tables, at the same time it guarantees that the tables are 259 // identical since it does the check 260 runSmallBatchTest(); 261 262 // with a quorum address (a cluster key) 263 String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() }; 264 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 265 266 Scan scan = new Scan(); 267 ResultScanner rs = htable2.getScanner(scan); 268 Put put = null; 269 for (Result result : rs) { 270 put = new Put(result.getRow()); 271 Cell firstVal = result.rawCells()[0]; 272 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 273 Bytes.toBytes("diff data")); 274 htable2.put(put); 275 } 276 Delete delete = new Delete(put.getRow()); 277 htable2.delete(delete); 278 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 279 } 280 281 @Test 282 public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception { 283 // Populate the tables, at the same time it guarantees that the tables are 284 // identical since it does the check 285 runSmallBatchTest(); 286 287 // Take source and target tables snapshot 288 Path rootDir = CommonFSUtils.getRootDir(CONF1); 289 FileSystem fs = rootDir.getFileSystem(CONF1); 290 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 291 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 292 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 293 294 // Take target snapshot 295 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 296 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 297 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 298 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 299 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 300 301 String peerFSAddress = peerFs.getUri().toString(); 302 String tmpPath1 = UTIL1.getRandomDir().toString(); 303 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 304 305 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 306 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 307 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 308 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 309 tableName.getNameAsString() }; 310 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 311 checkRestoreTmpDir(CONF1, tmpPath1, 1); 312 checkRestoreTmpDir(CONF2, tmpPath2, 1); 313 314 Scan scan = new Scan(); 315 ResultScanner rs = htable2.getScanner(scan); 316 Put put = null; 317 for (Result result : rs) { 318 put = new Put(result.getRow()); 319 Cell firstVal = result.rawCells()[0]; 320 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 321 Bytes.toBytes("diff data")); 322 htable2.put(put); 323 } 324 Delete delete = new Delete(put.getRow()); 325 htable2.delete(delete); 326 327 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 328 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 329 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 330 331 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 332 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 333 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 334 335 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 336 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 337 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 338 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 339 tableName.getNameAsString() }; 340 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 341 checkRestoreTmpDir(CONF1, tmpPath1, 2); 342 checkRestoreTmpDir(CONF2, tmpPath2, 2); 343 } 344 345 static void runBatchCopyTest() throws Exception { 346 // normal Batch tests for htable1 347 loadData("", row, noRepfamName); 348 349 Scan scan1 = new Scan(); 350 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 351 ResultScanner scanner1 = htable1.getScanner(scan1); 352 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 353 for (Result result : res1) { 354 Put put = new Put(result.getRow()); 355 for (Cell cell : result.rawCells()) { 356 put.add(cell); 357 } 358 puts.add(put); 359 } 360 scanner1.close(); 361 assertEquals(NB_ROWS_IN_BATCH, res1.length); 362 363 // Copy the data to htable3 364 htable3.put(puts); 365 366 Scan scan2 = new Scan(); 367 ResultScanner scanner2 = htable3.getScanner(scan2); 368 Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH); 369 scanner2.close(); 370 assertEquals(NB_ROWS_IN_BATCH, res2.length); 371 } 372 373 @Test 374 public void testVerifyRepJobWithPeerTableName() throws Exception { 375 // Populate the tables with same data 376 runBatchCopyTest(); 377 378 // with a peerTableName along with quorum address (a cluster key) 379 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 380 UTIL2.getClusterKey(), tableName.getNameAsString() }; 381 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 382 383 UTIL2.deleteTableData(peerTableName); 384 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 385 } 386 387 @Test 388 public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception { 389 // Populate the tables with same data 390 runBatchCopyTest(); 391 392 // Take source and target tables snapshot 393 Path rootDir = CommonFSUtils.getRootDir(CONF1); 394 FileSystem fs = rootDir.getFileSystem(CONF1); 395 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 396 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 397 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 398 399 // Take target snapshot 400 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 401 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 402 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 403 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 404 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 405 406 String peerFSAddress = peerFs.getUri().toString(); 407 String tmpPath1 = UTIL1.getRandomDir().toString(); 408 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 409 410 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 411 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 412 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 413 "--peerFSAddress=" + peerFSAddress, 414 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 415 tableName.getNameAsString() }; 416 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 417 checkRestoreTmpDir(CONF1, tmpPath1, 1); 418 checkRestoreTmpDir(CONF2, tmpPath2, 1); 419 420 Scan scan = new Scan(); 421 ResultScanner rs = htable3.getScanner(scan); 422 Put put = null; 423 for (Result result : rs) { 424 put = new Put(result.getRow()); 425 Cell firstVal = result.rawCells()[0]; 426 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 427 Bytes.toBytes("diff data")); 428 htable3.put(put); 429 } 430 Delete delete = new Delete(put.getRow()); 431 htable3.delete(delete); 432 433 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 434 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 435 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 436 437 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 438 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 439 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 440 441 args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 442 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 443 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 444 "--peerFSAddress=" + peerFSAddress, 445 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), 446 tableName.getNameAsString() }; 447 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 448 checkRestoreTmpDir(CONF1, tmpPath1, 2); 449 checkRestoreTmpDir(CONF2, tmpPath2, 2); 450 } 451 452 @Test 453 public void testVerifyReplicationThreadedRecompares() throws Exception { 454 // Populate the tables with same data 455 runBatchCopyTest(); 456 457 // ONLY_IN_PEER_TABLE_ROWS 458 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 459 put.addColumn(noRepfamName, row, row); 460 htable3.put(put); 461 462 // CONTENT_DIFFERENT_ROWS 463 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 464 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 465 htable3.put(put); 466 467 // ONLY_IN_SOURCE_TABLE_ROWS 468 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 469 put.addColumn(noRepfamName, row, row); 470 htable1.put(put); 471 472 String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3", 473 "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(), 474 UTIL2.getClusterKey(), tableName.getNameAsString() }; 475 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 476 assertEquals( 477 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); 478 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), 479 9); 480 assertEquals( 481 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), 482 1); 483 assertEquals( 484 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), 485 1); 486 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) 487 .getValue(), 1); 488 } 489 490 @Test 491 public void testFailsRemainingComparesAfterShutdown() throws Exception { 492 // Populate the tables with same data 493 runBatchCopyTest(); 494 495 // ONLY_IN_PEER_TABLE_ROWS 496 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 497 put.addColumn(noRepfamName, row, row); 498 htable3.put(put); 499 500 // CONTENT_DIFFERENT_ROWS 501 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 502 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 503 htable3.put(put); 504 505 // ONLY_IN_SOURCE_TABLE_ROWS 506 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 507 put.addColumn(noRepfamName, row, row); 508 htable1.put(put); 509 510 /** 511 * recompareSleep is set to exceed how long we wait on 512 * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to 513 * test the counter-incrementing logic if the executor still hasn't terminated after the call to 514 * shutdown and awaitTermination 515 */ 516 String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1", 517 "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(), 518 UTIL2.getClusterKey(), tableName.getNameAsString() }; 519 520 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 521 assertEquals( 522 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 3); 523 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), 524 3); 525 assertEquals( 526 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), 527 1); 528 assertEquals( 529 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), 530 1); 531 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) 532 .getValue(), 1); 533 } 534 535 @Test 536 public void testVerifyReplicationSynchronousRecompares() throws Exception { 537 // Populate the tables with same data 538 runBatchCopyTest(); 539 540 // ONLY_IN_PEER_TABLE_ROWS 541 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 542 put.addColumn(noRepfamName, row, row); 543 htable3.put(put); 544 545 // CONTENT_DIFFERENT_ROWS 546 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 547 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 548 htable3.put(put); 549 550 // ONLY_IN_SOURCE_TABLE_ROWS 551 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 552 put.addColumn(noRepfamName, row, row); 553 htable1.put(put); 554 555 String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1", 556 "--peerTableName=" + peerTableName.getNameAsString(), UTIL2.getClusterKey(), 557 tableName.getNameAsString() }; 558 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 559 assertEquals( 560 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); 561 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), 562 9); 563 assertEquals( 564 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), 565 1); 566 assertEquals( 567 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), 568 1); 569 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) 570 .getValue(), 1); 571 } 572 573 @AfterClass 574 public static void tearDownAfterClass() throws Exception { 575 htable3.close(); 576 TestReplicationBase.tearDownAfterClass(); 577 } 578}