001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; 053import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.CommonFSUtils; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.mapreduce.Counters; 058import org.apache.hadoop.mapreduce.Job; 059import org.junit.AfterClass; 060import org.junit.Before; 061import org.junit.BeforeClass; 062import org.junit.Rule; 063import org.junit.Test; 064import org.junit.rules.TestName; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068public abstract class VerifyReplicationTestBase extends TestReplicationBase { 069 070 private static final Logger LOG = 071 LoggerFactory.getLogger(TestVerifyReplicationZkClusterKey.class); 072 073 private static final String PEER_ID = "2"; 074 private static final TableName peerTableName = TableName.valueOf("peerTest"); 075 private static Table htable3; 076 077 @Rule 078 public TestName name = new TestName(); 079 080 @Before 081 public void setUp() throws Exception { 082 cleanUp(); 083 UTIL2.deleteTableData(peerTableName); 084 } 085 086 @BeforeClass 087 public static void setUpBeforeClass() throws Exception { 088 TestReplicationBase.setUpBeforeClass(); 089 090 TableDescriptor peerTable = 091 TableDescriptorBuilder.newBuilder(peerTableName) 092 .setColumnFamily( 093 ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build()) 094 .build(); 095 096 Connection connection2 = ConnectionFactory.createConnection(CONF2); 097 try (Admin admin2 = connection2.getAdmin()) { 098 admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 099 } 100 htable3 = connection2.getTable(peerTableName); 101 } 102 103 static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) 104 throws IOException, InterruptedException, ClassNotFoundException { 105 Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); 106 if (job == null) { 107 fail("Job wasn't created, see the log"); 108 } 109 if (!job.waitForCompletion(true)) { 110 fail("Job failed, see the log"); 111 } 112 assertEquals(expectedGoodRows, 113 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); 114 assertEquals(expectedBadRows, 115 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); 116 return job.getCounters(); 117 } 118 119 /** 120 * Do a small loading into a table, make sure the data is really the same, then run the 121 * VerifyReplication job to check the results. Do a second comparison where all the cells are 122 * different. 123 */ 124 @Test 125 public void testVerifyRepJob() throws Exception { 126 // Populate the tables, at the same time it guarantees that the tables are 127 // identical since it does the check 128 runSmallBatchTest(); 129 130 String[] args = new String[] { PEER_ID, tableName.getNameAsString() }; 131 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 132 133 Scan scan = new Scan(); 134 ResultScanner rs = htable2.getScanner(scan); 135 Put put = null; 136 for (Result result : rs) { 137 put = new Put(result.getRow()); 138 Cell firstVal = result.rawCells()[0]; 139 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 140 Bytes.toBytes("diff data")); 141 htable2.put(put); 142 } 143 Delete delete = new Delete(put.getRow()); 144 htable2.delete(delete); 145 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 146 } 147 148 /** 149 * Load a row into a table, make sure the data is really the same, delete the row, make sure the 150 * delete marker is replicated, run verify replication with and without raw to check the results. 151 */ 152 @Test 153 public void testVerifyRepJobWithRawOptions() throws Exception { 154 LOG.info(name.getMethodName()); 155 156 final TableName tableName = TableName.valueOf(name.getMethodName()); 157 byte[] familyname = Bytes.toBytes("fam_raw"); 158 byte[] row = Bytes.toBytes("row_raw"); 159 160 Table lHtable1 = null; 161 Table lHtable2 = null; 162 163 try { 164 ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname) 165 .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build(); 166 TableDescriptor table = 167 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build(); 168 169 Connection connection1 = ConnectionFactory.createConnection(CONF1); 170 Connection connection2 = ConnectionFactory.createConnection(CONF2); 171 try (Admin admin1 = connection1.getAdmin()) { 172 admin1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 173 } 174 try (Admin admin2 = connection2.getAdmin()) { 175 admin2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 176 } 177 UTIL1.waitUntilAllRegionsAssigned(tableName); 178 UTIL2.waitUntilAllRegionsAssigned(tableName); 179 180 lHtable1 = UTIL1.getConnection().getTable(tableName); 181 lHtable2 = UTIL2.getConnection().getTable(tableName); 182 183 Put put = new Put(row); 184 put.addColumn(familyname, row, row); 185 lHtable1.put(put); 186 187 Get get = new Get(row); 188 for (int i = 0; i < NB_RETRIES; i++) { 189 if (i == NB_RETRIES - 1) { 190 fail("Waited too much time for put replication"); 191 } 192 Result res = lHtable2.get(get); 193 if (res.isEmpty()) { 194 LOG.info("Row not available"); 195 Thread.sleep(SLEEP_TIME); 196 } else { 197 assertArrayEquals(res.value(), row); 198 break; 199 } 200 } 201 202 Delete del = new Delete(row); 203 lHtable1.delete(del); 204 205 get = new Get(row); 206 for (int i = 0; i < NB_RETRIES; i++) { 207 if (i == NB_RETRIES - 1) { 208 fail("Waited too much time for del replication"); 209 } 210 Result res = lHtable2.get(get); 211 if (res.size() >= 1) { 212 LOG.info("Row not deleted"); 213 Thread.sleep(SLEEP_TIME); 214 } else { 215 break; 216 } 217 } 218 219 // Checking verifyReplication for the default behavior. 220 String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() }; 221 runVerifyReplication(argsWithoutRaw, 0, 0); 222 223 // Checking verifyReplication with raw 224 String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() }; 225 runVerifyReplication(argsWithRawAsTrue, 1, 0); 226 } finally { 227 if (lHtable1 != null) { 228 lHtable1.close(); 229 } 230 if (lHtable2 != null) { 231 lHtable2.close(); 232 } 233 } 234 } 235 236 static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount) 237 throws IOException { 238 FileSystem fs = FileSystem.get(conf); 239 FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir)); 240 assertNotNull(subDirectories); 241 assertEquals(subDirectories.length, expectedCount); 242 for (int i = 0; i < expectedCount; i++) { 243 assertTrue(subDirectories[i].isDirectory()); 244 } 245 } 246 247 @Test 248 public void testVerifyRepJobWithQuorumAddress() throws Exception { 249 // Populate the tables, at the same time it guarantees that the tables are 250 // identical since it does the check 251 runSmallBatchTest(); 252 253 // with a quorum address (a cluster key) 254 String[] args = new String[] { getClusterKey(UTIL2), tableName.getNameAsString() }; 255 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 256 257 Scan scan = new Scan(); 258 ResultScanner rs = htable2.getScanner(scan); 259 Put put = null; 260 for (Result result : rs) { 261 put = new Put(result.getRow()); 262 Cell firstVal = result.rawCells()[0]; 263 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 264 Bytes.toBytes("diff data")); 265 htable2.put(put); 266 } 267 Delete delete = new Delete(put.getRow()); 268 htable2.delete(delete); 269 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 270 } 271 272 @Test 273 public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception { 274 // Populate the tables, at the same time it guarantees that the tables are 275 // identical since it does the check 276 runSmallBatchTest(); 277 278 // Take source and target tables snapshot 279 Path rootDir = CommonFSUtils.getRootDir(CONF1); 280 FileSystem fs = rootDir.getFileSystem(CONF1); 281 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 282 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 283 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 284 285 // Take target snapshot 286 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 287 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 288 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 289 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 290 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 291 292 String peerFSAddress = peerFs.getUri().toString(); 293 String tmpPath1 = UTIL1.getRandomDir().toString(); 294 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 295 296 String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 297 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 298 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 299 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 300 tableName.getNameAsString() }; 301 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 302 checkRestoreTmpDir(CONF1, tmpPath1, 1); 303 checkRestoreTmpDir(CONF2, tmpPath2, 1); 304 305 Scan scan = new Scan(); 306 ResultScanner rs = htable2.getScanner(scan); 307 Put put = null; 308 for (Result result : rs) { 309 put = new Put(result.getRow()); 310 Cell firstVal = result.rawCells()[0]; 311 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 312 Bytes.toBytes("diff data")); 313 htable2.put(put); 314 } 315 Delete delete = new Delete(put.getRow()); 316 htable2.delete(delete); 317 318 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 319 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 320 Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true); 321 322 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 323 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName, 324 Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true); 325 326 args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, 327 "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, 328 "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, 329 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 330 tableName.getNameAsString() }; 331 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 332 checkRestoreTmpDir(CONF1, tmpPath1, 2); 333 checkRestoreTmpDir(CONF2, tmpPath2, 2); 334 } 335 336 static void runBatchCopyTest() throws Exception { 337 // normal Batch tests for htable1 338 loadData("", row, noRepfamName); 339 340 Scan scan1 = new Scan(); 341 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); 342 ResultScanner scanner1 = htable1.getScanner(scan1); 343 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); 344 for (Result result : res1) { 345 Put put = new Put(result.getRow()); 346 for (Cell cell : result.rawCells()) { 347 put.add(cell); 348 } 349 puts.add(put); 350 } 351 scanner1.close(); 352 assertEquals(NB_ROWS_IN_BATCH, res1.length); 353 354 // Copy the data to htable3 355 htable3.put(puts); 356 357 Scan scan2 = new Scan(); 358 ResultScanner scanner2 = htable3.getScanner(scan2); 359 Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH); 360 scanner2.close(); 361 assertEquals(NB_ROWS_IN_BATCH, res2.length); 362 } 363 364 @Test 365 public void testVerifyRepJobWithPeerTableName() throws Exception { 366 // Populate the tables with same data 367 runBatchCopyTest(); 368 369 // with a peerTableName along with quorum address (a cluster key) 370 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 371 getClusterKey(UTIL2), tableName.getNameAsString() }; 372 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 373 374 UTIL2.deleteTableData(peerTableName); 375 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 376 } 377 378 @Test 379 public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception { 380 // Populate the tables with same data 381 runBatchCopyTest(); 382 383 // Take source and target tables snapshot 384 Path rootDir = CommonFSUtils.getRootDir(CONF1); 385 FileSystem fs = rootDir.getFileSystem(CONF1); 386 String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 387 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 388 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 389 390 // Take target snapshot 391 Path peerRootDir = CommonFSUtils.getRootDir(CONF2); 392 FileSystem peerFs = peerRootDir.getFileSystem(CONF2); 393 String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 394 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 395 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 396 397 String peerFSAddress = peerFs.getUri().toString(); 398 String tmpPath1 = UTIL1.getRandomDir().toString(); 399 String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime(); 400 401 String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 402 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 403 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 404 "--peerFSAddress=" + peerFSAddress, 405 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 406 tableName.getNameAsString() }; 407 runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); 408 checkRestoreTmpDir(CONF1, tmpPath1, 1); 409 checkRestoreTmpDir(CONF2, tmpPath2, 1); 410 411 Scan scan = new Scan(); 412 ResultScanner rs = htable3.getScanner(scan); 413 Put put = null; 414 for (Result result : rs) { 415 put = new Put(result.getRow()); 416 Cell firstVal = result.rawCells()[0]; 417 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal), 418 Bytes.toBytes("diff data")); 419 htable3.put(put); 420 } 421 Delete delete = new Delete(put.getRow()); 422 htable3.delete(delete); 423 424 sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime(); 425 SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName, 426 Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true); 427 428 peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime(); 429 SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName, 430 Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true); 431 432 args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), 433 "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, 434 "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, 435 "--peerFSAddress=" + peerFSAddress, 436 "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), 437 tableName.getNameAsString() }; 438 runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); 439 checkRestoreTmpDir(CONF1, tmpPath1, 2); 440 checkRestoreTmpDir(CONF2, tmpPath2, 2); 441 } 442 443 @Test 444 public void testVerifyReplicationThreadedRecompares() throws Exception { 445 // Populate the tables with same data 446 runBatchCopyTest(); 447 448 // ONLY_IN_PEER_TABLE_ROWS 449 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 450 put.addColumn(noRepfamName, row, row); 451 htable3.put(put); 452 453 // CONTENT_DIFFERENT_ROWS 454 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 455 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 456 htable3.put(put); 457 458 // ONLY_IN_SOURCE_TABLE_ROWS 459 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 460 put.addColumn(noRepfamName, row, row); 461 htable1.put(put); 462 463 String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3", 464 "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(), 465 getClusterKey(UTIL2), tableName.getNameAsString() }; 466 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 467 assertEquals( 468 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); 469 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), 470 9); 471 assertEquals( 472 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), 473 1); 474 assertEquals( 475 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), 476 1); 477 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) 478 .getValue(), 1); 479 } 480 481 @Test 482 public void testFailsRemainingComparesAfterShutdown() throws Exception { 483 // Populate the tables with same data 484 runBatchCopyTest(); 485 486 // ONLY_IN_PEER_TABLE_ROWS 487 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 488 put.addColumn(noRepfamName, row, row); 489 htable3.put(put); 490 491 // CONTENT_DIFFERENT_ROWS 492 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 493 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 494 htable3.put(put); 495 496 // ONLY_IN_SOURCE_TABLE_ROWS 497 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 498 put.addColumn(noRepfamName, row, row); 499 htable1.put(put); 500 501 /** 502 * recompareSleep is set to exceed how long we wait on 503 * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to 504 * test the counter-incrementing logic if the executor still hasn't terminated after the call to 505 * shutdown and awaitTermination 506 */ 507 String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1", 508 "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(), 509 getClusterKey(UTIL2), tableName.getNameAsString() }; 510 511 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 512 assertEquals( 513 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 3); 514 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), 515 3); 516 assertEquals( 517 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), 518 1); 519 assertEquals( 520 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), 521 1); 522 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) 523 .getValue(), 1); 524 } 525 526 @Test 527 public void testVerifyReplicationSynchronousRecompares() throws Exception { 528 // Populate the tables with same data 529 runBatchCopyTest(); 530 531 // ONLY_IN_PEER_TABLE_ROWS 532 Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); 533 put.addColumn(noRepfamName, row, row); 534 htable3.put(put); 535 536 // CONTENT_DIFFERENT_ROWS 537 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); 538 put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); 539 htable3.put(put); 540 541 // ONLY_IN_SOURCE_TABLE_ROWS 542 put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); 543 put.addColumn(noRepfamName, row, row); 544 htable1.put(put); 545 546 String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1", 547 "--peerTableName=" + peerTableName.getNameAsString(), getClusterKey(UTIL2), 548 tableName.getNameAsString() }; 549 Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); 550 assertEquals( 551 counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); 552 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), 553 9); 554 assertEquals( 555 counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), 556 1); 557 assertEquals( 558 counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), 559 1); 560 assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) 561 .getValue(), 1); 562 } 563 564 @AfterClass 565 public static void tearDownAfterClass() throws Exception { 566 htable3.close(); 567 TestReplicationBase.tearDownAfterClass(); 568 } 569}