001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
053import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.mapreduce.Counters;
058import org.apache.hadoop.mapreduce.Job;
059import org.junit.AfterClass;
060import org.junit.Before;
061import org.junit.BeforeClass;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.rules.TestName;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068public abstract class VerifyReplicationTestBase extends TestReplicationBase {
069
070  private static final Logger LOG =
071    LoggerFactory.getLogger(TestVerifyReplicationZkClusterKey.class);
072
073  private static final String PEER_ID = "2";
074  private static final TableName peerTableName = TableName.valueOf("peerTest");
075  private static Table htable3;
076
077  @Rule
078  public TestName name = new TestName();
079
080  @Before
081  public void setUp() throws Exception {
082    cleanUp();
083    UTIL2.deleteTableData(peerTableName);
084  }
085
086  @BeforeClass
087  public static void setUpBeforeClass() throws Exception {
088    TestReplicationBase.setUpBeforeClass();
089
090    TableDescriptor peerTable =
091      TableDescriptorBuilder.newBuilder(peerTableName)
092        .setColumnFamily(
093          ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100).build())
094        .build();
095
096    Connection connection2 = ConnectionFactory.createConnection(CONF2);
097    try (Admin admin2 = connection2.getAdmin()) {
098      admin2.createTable(peerTable, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
099    }
100    htable3 = connection2.getTable(peerTableName);
101  }
102
103  static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
104    throws IOException, InterruptedException, ClassNotFoundException {
105    Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args);
106    if (job == null) {
107      fail("Job wasn't created, see the log");
108    }
109    if (!job.waitForCompletion(true)) {
110      fail("Job failed, see the log");
111    }
112    assertEquals(expectedGoodRows,
113      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
114    assertEquals(expectedBadRows,
115      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
116    return job.getCounters();
117  }
118
119  /**
120   * Do a small loading into a table, make sure the data is really the same, then run the
121   * VerifyReplication job to check the results. Do a second comparison where all the cells are
122   * different.
123   */
124  @Test
125  public void testVerifyRepJob() throws Exception {
126    // Populate the tables, at the same time it guarantees that the tables are
127    // identical since it does the check
128    runSmallBatchTest();
129
130    String[] args = new String[] { PEER_ID, tableName.getNameAsString() };
131    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
132
133    Scan scan = new Scan();
134    ResultScanner rs = htable2.getScanner(scan);
135    Put put = null;
136    for (Result result : rs) {
137      put = new Put(result.getRow());
138      Cell firstVal = result.rawCells()[0];
139      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
140        Bytes.toBytes("diff data"));
141      htable2.put(put);
142    }
143    Delete delete = new Delete(put.getRow());
144    htable2.delete(delete);
145    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
146  }
147
148  /**
149   * Load a row into a table, make sure the data is really the same, delete the row, make sure the
150   * delete marker is replicated, run verify replication with and without raw to check the results.
151   */
152  @Test
153  public void testVerifyRepJobWithRawOptions() throws Exception {
154    LOG.info(name.getMethodName());
155
156    final TableName tableName = TableName.valueOf(name.getMethodName());
157    byte[] familyname = Bytes.toBytes("fam_raw");
158    byte[] row = Bytes.toBytes("row_raw");
159
160    Table lHtable1 = null;
161    Table lHtable2 = null;
162
163    try {
164      ColumnFamilyDescriptor fam = ColumnFamilyDescriptorBuilder.newBuilder(familyname)
165        .setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
166      TableDescriptor table =
167        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
168
169      Connection connection1 = ConnectionFactory.createConnection(CONF1);
170      Connection connection2 = ConnectionFactory.createConnection(CONF2);
171      try (Admin admin1 = connection1.getAdmin()) {
172        admin1.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
173      }
174      try (Admin admin2 = connection2.getAdmin()) {
175        admin2.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
176      }
177      UTIL1.waitUntilAllRegionsAssigned(tableName);
178      UTIL2.waitUntilAllRegionsAssigned(tableName);
179
180      lHtable1 = UTIL1.getConnection().getTable(tableName);
181      lHtable2 = UTIL2.getConnection().getTable(tableName);
182
183      Put put = new Put(row);
184      put.addColumn(familyname, row, row);
185      lHtable1.put(put);
186
187      Get get = new Get(row);
188      for (int i = 0; i < NB_RETRIES; i++) {
189        if (i == NB_RETRIES - 1) {
190          fail("Waited too much time for put replication");
191        }
192        Result res = lHtable2.get(get);
193        if (res.isEmpty()) {
194          LOG.info("Row not available");
195          Thread.sleep(SLEEP_TIME);
196        } else {
197          assertArrayEquals(res.value(), row);
198          break;
199        }
200      }
201
202      Delete del = new Delete(row);
203      lHtable1.delete(del);
204
205      get = new Get(row);
206      for (int i = 0; i < NB_RETRIES; i++) {
207        if (i == NB_RETRIES - 1) {
208          fail("Waited too much time for del replication");
209        }
210        Result res = lHtable2.get(get);
211        if (res.size() >= 1) {
212          LOG.info("Row not deleted");
213          Thread.sleep(SLEEP_TIME);
214        } else {
215          break;
216        }
217      }
218
219      // Checking verifyReplication for the default behavior.
220      String[] argsWithoutRaw = new String[] { PEER_ID, tableName.getNameAsString() };
221      runVerifyReplication(argsWithoutRaw, 0, 0);
222
223      // Checking verifyReplication with raw
224      String[] argsWithRawAsTrue = new String[] { "--raw", PEER_ID, tableName.getNameAsString() };
225      runVerifyReplication(argsWithRawAsTrue, 1, 0);
226    } finally {
227      if (lHtable1 != null) {
228        lHtable1.close();
229      }
230      if (lHtable2 != null) {
231        lHtable2.close();
232      }
233    }
234  }
235
236  static void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
237    throws IOException {
238    FileSystem fs = FileSystem.get(conf);
239    FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
240    assertNotNull(subDirectories);
241    assertEquals(subDirectories.length, expectedCount);
242    for (int i = 0; i < expectedCount; i++) {
243      assertTrue(subDirectories[i].isDirectory());
244    }
245  }
246
247  @Test
248  public void testVerifyRepJobWithQuorumAddress() throws Exception {
249    // Populate the tables, at the same time it guarantees that the tables are
250    // identical since it does the check
251    runSmallBatchTest();
252
253    // with a quorum address (a cluster key)
254    String[] args = new String[] { getClusterKey(UTIL2), tableName.getNameAsString() };
255    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
256
257    Scan scan = new Scan();
258    ResultScanner rs = htable2.getScanner(scan);
259    Put put = null;
260    for (Result result : rs) {
261      put = new Put(result.getRow());
262      Cell firstVal = result.rawCells()[0];
263      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
264        Bytes.toBytes("diff data"));
265      htable2.put(put);
266    }
267    Delete delete = new Delete(put.getRow());
268    htable2.delete(delete);
269    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
270  }
271
272  @Test
273  public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
274    // Populate the tables, at the same time it guarantees that the tables are
275    // identical since it does the check
276    runSmallBatchTest();
277
278    // Take source and target tables snapshot
279    Path rootDir = CommonFSUtils.getRootDir(CONF1);
280    FileSystem fs = rootDir.getFileSystem(CONF1);
281    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
282    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
283      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
284
285    // Take target snapshot
286    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
287    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
288    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
289    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
290      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
291
292    String peerFSAddress = peerFs.getUri().toString();
293    String tmpPath1 = UTIL1.getRandomDir().toString();
294    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
295
296    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
297      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
298      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
299      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
300      tableName.getNameAsString() };
301    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
302    checkRestoreTmpDir(CONF1, tmpPath1, 1);
303    checkRestoreTmpDir(CONF2, tmpPath2, 1);
304
305    Scan scan = new Scan();
306    ResultScanner rs = htable2.getScanner(scan);
307    Put put = null;
308    for (Result result : rs) {
309      put = new Put(result.getRow());
310      Cell firstVal = result.rawCells()[0];
311      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
312        Bytes.toBytes("diff data"));
313      htable2.put(put);
314    }
315    Delete delete = new Delete(put.getRow());
316    htable2.delete(delete);
317
318    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
319    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
320      Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
321
322    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
323    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
324      Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
325
326    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
327      "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
328      "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
329      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
330      tableName.getNameAsString() };
331    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
332    checkRestoreTmpDir(CONF1, tmpPath1, 2);
333    checkRestoreTmpDir(CONF2, tmpPath2, 2);
334  }
335
336  static void runBatchCopyTest() throws Exception {
337    // normal Batch tests for htable1
338    loadData("", row, noRepfamName);
339
340    Scan scan1 = new Scan();
341    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
342    ResultScanner scanner1 = htable1.getScanner(scan1);
343    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
344    for (Result result : res1) {
345      Put put = new Put(result.getRow());
346      for (Cell cell : result.rawCells()) {
347        put.add(cell);
348      }
349      puts.add(put);
350    }
351    scanner1.close();
352    assertEquals(NB_ROWS_IN_BATCH, res1.length);
353
354    // Copy the data to htable3
355    htable3.put(puts);
356
357    Scan scan2 = new Scan();
358    ResultScanner scanner2 = htable3.getScanner(scan2);
359    Result[] res2 = scanner2.next(NB_ROWS_IN_BATCH);
360    scanner2.close();
361    assertEquals(NB_ROWS_IN_BATCH, res2.length);
362  }
363
364  @Test
365  public void testVerifyRepJobWithPeerTableName() throws Exception {
366    // Populate the tables with same data
367    runBatchCopyTest();
368
369    // with a peerTableName along with quorum address (a cluster key)
370    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
371      getClusterKey(UTIL2), tableName.getNameAsString() };
372    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
373
374    UTIL2.deleteTableData(peerTableName);
375    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
376  }
377
378  @Test
379  public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Exception {
380    // Populate the tables with same data
381    runBatchCopyTest();
382
383    // Take source and target tables snapshot
384    Path rootDir = CommonFSUtils.getRootDir(CONF1);
385    FileSystem fs = rootDir.getFileSystem(CONF1);
386    String sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
387    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
388      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
389
390    // Take target snapshot
391    Path peerRootDir = CommonFSUtils.getRootDir(CONF2);
392    FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
393    String peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
394    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
395      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
396
397    String peerFSAddress = peerFs.getUri().toString();
398    String tmpPath1 = UTIL1.getRandomDir().toString();
399    String tmpPath2 = "/tmp" + EnvironmentEdgeManager.currentTime();
400
401    String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
402      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
403      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
404      "--peerFSAddress=" + peerFSAddress,
405      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
406      tableName.getNameAsString() };
407    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
408    checkRestoreTmpDir(CONF1, tmpPath1, 1);
409    checkRestoreTmpDir(CONF2, tmpPath2, 1);
410
411    Scan scan = new Scan();
412    ResultScanner rs = htable3.getScanner(scan);
413    Put put = null;
414    for (Result result : rs) {
415      put = new Put(result.getRow());
416      Cell firstVal = result.rawCells()[0];
417      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
418        Bytes.toBytes("diff data"));
419      htable3.put(put);
420    }
421    Delete delete = new Delete(put.getRow());
422    htable3.delete(delete);
423
424    sourceSnapshotName = "sourceSnapshot-" + EnvironmentEdgeManager.currentTime();
425    SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
426      Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
427
428    peerSnapshotName = "peerSnapshot-" + EnvironmentEdgeManager.currentTime();
429    SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
430      Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
431
432    args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
433      "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1,
434      "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2,
435      "--peerFSAddress=" + peerFSAddress,
436      "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2),
437      tableName.getNameAsString() };
438    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
439    checkRestoreTmpDir(CONF1, tmpPath1, 2);
440    checkRestoreTmpDir(CONF2, tmpPath2, 2);
441  }
442
443  @Test
444  public void testVerifyReplicationThreadedRecompares() throws Exception {
445    // Populate the tables with same data
446    runBatchCopyTest();
447
448    // ONLY_IN_PEER_TABLE_ROWS
449    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
450    put.addColumn(noRepfamName, row, row);
451    htable3.put(put);
452
453    // CONTENT_DIFFERENT_ROWS
454    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
455    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
456    htable3.put(put);
457
458    // ONLY_IN_SOURCE_TABLE_ROWS
459    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
460    put.addColumn(noRepfamName, row, row);
461    htable1.put(put);
462
463    String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3",
464      "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(),
465      getClusterKey(UTIL2), tableName.getNameAsString() };
466    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
467    assertEquals(
468      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9);
469    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
470      9);
471    assertEquals(
472      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
473      1);
474    assertEquals(
475      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
476      1);
477    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
478      .getValue(), 1);
479  }
480
481  @Test
482  public void testFailsRemainingComparesAfterShutdown() throws Exception {
483    // Populate the tables with same data
484    runBatchCopyTest();
485
486    // ONLY_IN_PEER_TABLE_ROWS
487    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
488    put.addColumn(noRepfamName, row, row);
489    htable3.put(put);
490
491    // CONTENT_DIFFERENT_ROWS
492    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
493    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
494    htable3.put(put);
495
496    // ONLY_IN_SOURCE_TABLE_ROWS
497    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
498    put.addColumn(noRepfamName, row, row);
499    htable1.put(put);
500
501    /**
502     * recompareSleep is set to exceed how long we wait on
503     * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to
504     * test the counter-incrementing logic if the executor still hasn't terminated after the call to
505     * shutdown and awaitTermination
506     */
507    String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1",
508      "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(),
509      getClusterKey(UTIL2), tableName.getNameAsString() };
510
511    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
512    assertEquals(
513      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 3);
514    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
515      3);
516    assertEquals(
517      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
518      1);
519    assertEquals(
520      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
521      1);
522    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
523      .getValue(), 1);
524  }
525
526  @Test
527  public void testVerifyReplicationSynchronousRecompares() throws Exception {
528    // Populate the tables with same data
529    runBatchCopyTest();
530
531    // ONLY_IN_PEER_TABLE_ROWS
532    Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH)));
533    put.addColumn(noRepfamName, row, row);
534    htable3.put(put);
535
536    // CONTENT_DIFFERENT_ROWS
537    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1)));
538    put.addColumn(noRepfamName, row, Bytes.toBytes("diff value"));
539    htable3.put(put);
540
541    // ONLY_IN_SOURCE_TABLE_ROWS
542    put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1)));
543    put.addColumn(noRepfamName, row, row);
544    htable1.put(put);
545
546    String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1",
547      "--peerTableName=" + peerTableName.getNameAsString(), getClusterKey(UTIL2),
548      tableName.getNameAsString() };
549    Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3);
550    assertEquals(
551      counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9);
552    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(),
553      9);
554    assertEquals(
555      counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(),
556      1);
557    assertEquals(
558      counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(),
559      1);
560    assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS)
561      .getValue(), 1);
562  }
563
564  @AfterClass
565  public static void tearDownAfterClass() throws Exception {
566    htable3.close();
567    TestReplicationBase.tearDownAfterClass();
568  }
569}