001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.util.Arrays;
025import java.util.function.BooleanSupplier;
026import org.apache.commons.lang3.ArrayUtils;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
041import org.apache.hadoop.hbase.testclassification.LargeTests;
042import org.apache.hadoop.hbase.testclassification.MapReduceTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.mapreduce.Counters;
046import org.junit.AfterClass;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Rule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.rules.TestName;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Basic test for the SyncTable M/R tool
058 */
059@Category({ MapReduceTests.class, LargeTests.class })
060public class TestSyncTable {
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestSyncTable.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class);
066
067  private static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
068
069  private static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
070
071  @Rule
072  public TestName name = new TestName();
073
074  @BeforeClass
075  public static void beforeClass() throws Exception {
076    UTIL1.startMiniCluster(3);
077    UTIL2.startMiniCluster(3);
078  }
079
080  @AfterClass
081  public static void afterClass() throws Exception {
082    UTIL2.shutdownMiniCluster();
083    UTIL1.shutdownMiniCluster();
084  }
085
086  private static byte[][] generateSplits(int numRows, int numRegions) {
087    byte[][] splitRows = new byte[numRegions - 1][];
088    for (int i = 1; i < numRegions; i++) {
089      splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions);
090    }
091    return splitRows;
092  }
093
094  private void testSyncTable(HBaseTestingUtil source, HBaseTestingUtil target, String... options)
095    throws Exception {
096    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
097    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
098    Path testDir = source.getDataTestDirOnTestFS(name.getMethodName());
099
100    writeTestData(source, sourceTableName, target, targetTableName);
101    hashSourceTable(source, sourceTableName, testDir);
102    Counters syncCounters =
103      syncTables(target.getConfiguration(), sourceTableName, targetTableName, testDir, options);
104    assertEqualTables(90, source, sourceTableName, target, targetTableName, false);
105
106    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
107    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
108    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
109    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
110    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
111    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
112
113    source.deleteTable(sourceTableName);
114    target.deleteTable(targetTableName);
115  }
116
117  @Test
118  public void testSyncTable() throws Exception {
119    testSyncTable(UTIL1, UTIL1);
120  }
121
122  @Test
123  public void testSyncTableToPeerCluster() throws Exception {
124    testSyncTable(UTIL1, UTIL2, "--sourceuri=" + UTIL1.getRpcConnnectionURI());
125  }
126
127  @Test
128  public void testSyncTableFromSourceToPeerCluster() throws Exception {
129    testSyncTable(UTIL2, UTIL1, "--sourceuri=" + UTIL2.getRpcConnnectionURI(),
130      "--targeturi=" + UTIL1.getZkConnectionURI());
131  }
132
133  @Test
134  public void testSyncTableFromSourceToPeerClusterWithClusterKey() throws Exception {
135    testSyncTable(UTIL2, UTIL1, "--sourcezkcluster=" + UTIL2.getClusterKey(),
136      "--targetzkcluster=" + UTIL1.getClusterKey());
137  }
138
139  @Test
140  public void testSyncTableDoDeletesFalse() throws Exception {
141    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
142    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
143    Path testDir = UTIL1.getDataTestDirOnTestFS(name.getMethodName());
144
145    writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName);
146    hashSourceTable(UTIL1, sourceTableName, testDir);
147    Counters syncCounters = syncTables(UTIL1.getConfiguration(), sourceTableName, targetTableName,
148      testDir, "--doDeletes=false");
149    assertTargetDoDeletesFalse(100, UTIL1, sourceTableName, UTIL1, targetTableName);
150
151    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
152    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
153    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
154    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
155    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
156    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
157
158    UTIL1.deleteTable(sourceTableName);
159    UTIL1.deleteTable(targetTableName);
160  }
161
162  @Test
163  public void testSyncTableDoPutsFalse() throws Exception {
164    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
165    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
166    Path testDir = UTIL2.getDataTestDirOnTestFS(name.getMethodName());
167
168    writeTestData(UTIL2, sourceTableName, UTIL2, targetTableName);
169    hashSourceTable(UTIL2, sourceTableName, testDir);
170    Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName,
171      testDir, "--doPuts=false");
172    assertTargetDoPutsFalse(70, UTIL2, sourceTableName, UTIL2, targetTableName);
173
174    assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
175    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
176    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
177    assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
178    assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
179    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
180
181    UTIL2.deleteTable(sourceTableName);
182    UTIL2.deleteTable(targetTableName);
183  }
184
185  @Test
186  public void testSyncTableIgnoreTimestampsTrue() throws Exception {
187    final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
188    final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
189    Path testDir = UTIL1.getDataTestDirOnTestFS(name.getMethodName());
190    long current = EnvironmentEdgeManager.currentTime();
191    writeTestData(UTIL1, sourceTableName, UTIL2, targetTableName, current - 1000, current);
192    hashSourceTable(UTIL1, sourceTableName, testDir, "--ignoreTimestamps=true");
193    Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName,
194      testDir, "--ignoreTimestamps=true", "--sourceuri=" + UTIL1.getRpcConnnectionURI());
195    assertEqualTables(90, UTIL1, sourceTableName, UTIL2, targetTableName, true);
196
197    assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
198    assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
199    assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
200    assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
201    assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
202    assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
203
204    UTIL1.deleteTable(sourceTableName);
205    UTIL2.deleteTable(targetTableName);
206  }
207
208  private void assertCellEquals(Cell sourceCell, Cell targetCell, BooleanSupplier checkTimestamp) {
209    assertTrue("Rows don't match, source: " + sourceCell + ", target: " + targetCell,
210      CellUtil.matchingRows(sourceCell, targetCell));
211    assertTrue("Families don't match, source: " + sourceCell + ", target: " + targetCell,
212      CellUtil.matchingFamily(sourceCell, targetCell));
213    assertTrue("Qualifiers don't match, source: " + sourceCell + ", target: " + targetCell,
214      CellUtil.matchingQualifier(sourceCell, targetCell));
215    if (checkTimestamp.getAsBoolean()) {
216      assertTrue("Timestamps don't match, source: " + sourceCell + ", target: " + targetCell,
217        CellUtil.matchingTimestamp(sourceCell, targetCell));
218    }
219    assertTrue("Values don't match, source: " + sourceCell + ", target: " + targetCell,
220      CellUtil.matchingValue(sourceCell, targetCell));
221  }
222
223  private void assertEqualTables(int expectedRows, HBaseTestingUtil sourceCluster,
224    TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName,
225    boolean ignoreTimestamps) throws Exception {
226    try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName);
227      Table targetTable = targetCluster.getConnection().getTable(targetTableName);
228      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
229      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
230      for (int i = 0; i < expectedRows; i++) {
231        Result sourceRow = sourceScanner.next();
232        Result targetRow = targetScanner.next();
233
234        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
235          + " cells:" + sourceRow);
236        LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
237          + " cells:" + targetRow);
238
239        if (sourceRow == null) {
240          fail("Expected " + expectedRows + " source rows but only found " + i);
241        }
242        if (targetRow == null) {
243          fail("Expected " + expectedRows + " target rows but only found " + i);
244        }
245        Cell[] sourceCells = sourceRow.rawCells();
246        Cell[] targetCells = targetRow.rawCells();
247        if (sourceCells.length != targetCells.length) {
248          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
249          LOG.debug("Target cells: " + Arrays.toString(targetCells));
250          fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length
251            + " cells in source table but " + targetCells.length + " cells in target table");
252        }
253        for (int j = 0; j < sourceCells.length; j++) {
254          Cell sourceCell = sourceCells[j];
255          Cell targetCell = targetCells[j];
256          assertCellEquals(sourceCell, targetCell, () -> !ignoreTimestamps);
257        }
258      }
259      Result sourceRow = sourceScanner.next();
260      if (sourceRow != null) {
261        fail("Source table has more than " + expectedRows + " rows.  Next row: "
262          + Bytes.toInt(sourceRow.getRow()));
263      }
264      Result targetRow = targetScanner.next();
265      if (targetRow != null) {
266        fail("Target table has more than " + expectedRows + " rows.  Next row: "
267          + Bytes.toInt(targetRow.getRow()));
268      }
269    }
270  }
271
272  private void assertTargetDoDeletesFalse(int expectedRows, HBaseTestingUtil sourceCluster,
273    TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName)
274    throws Exception {
275    try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName);
276      Table targetTable = targetCluster.getConnection().getTable(targetTableName);
277
278      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
279      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
280      Result targetRow = targetScanner.next();
281      Result sourceRow = sourceScanner.next();
282      int rowsCount = 0;
283      while (targetRow != null) {
284        rowsCount++;
285        // only compares values for existing rows, skipping rows existing on
286        // target only that were not deleted given --doDeletes=false
287        if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
288          targetRow = targetScanner.next();
289          continue;
290        }
291
292        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
293          + " cells:" + sourceRow);
294        LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
295          + " cells:" + targetRow);
296
297        Cell[] sourceCells = sourceRow.rawCells();
298        Cell[] targetCells = targetRow.rawCells();
299        int targetRowKey = Bytes.toInt(targetRow.getRow());
300        if (targetRowKey >= 70 && targetRowKey < 80) {
301          if (sourceCells.length == targetCells.length) {
302            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
303            LOG.debug("Target cells: " + Arrays.toString(targetCells));
304            fail("Row " + targetRowKey + " should have more cells in " + "target than in source");
305          }
306
307        } else {
308          if (sourceCells.length != targetCells.length) {
309            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
310            LOG.debug("Target cells: " + Arrays.toString(targetCells));
311            fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length
312              + " cells in source table but " + targetCells.length + " cells in target table");
313          }
314        }
315        for (int j = 0; j < sourceCells.length; j++) {
316          Cell sourceCell = sourceCells[j];
317          Cell targetCell = targetCells[j];
318          assertCellEquals(sourceCell, targetCell, () -> targetRowKey < 80 && targetRowKey >= 90);
319        }
320        targetRow = targetScanner.next();
321        sourceRow = sourceScanner.next();
322      }
323      assertEquals("Target expected rows does not match.", expectedRows, rowsCount);
324    }
325  }
326
327  private void assertTargetDoPutsFalse(int expectedRows, HBaseTestingUtil sourceCluster,
328    TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName)
329    throws Exception {
330    try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName);
331      Table targetTable = targetCluster.getConnection().getTable(targetTableName);
332      ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
333      ResultScanner targetScanner = targetTable.getScanner(new Scan())) {
334      Result targetRow = targetScanner.next();
335      Result sourceRow = sourceScanner.next();
336      int rowsCount = 0;
337
338      while (targetRow != null) {
339        // only compares values for existing rows, skipping rows existing on
340        // source only that were not added to target given --doPuts=false
341        if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
342          sourceRow = sourceScanner.next();
343          continue;
344        }
345
346        LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
347          + " cells:" + sourceRow);
348        LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
349          + " cells:" + targetRow);
350
351        LOG.debug("rowsCount: " + rowsCount);
352
353        Cell[] sourceCells = sourceRow.rawCells();
354        Cell[] targetCells = targetRow.rawCells();
355        int targetRowKey = Bytes.toInt(targetRow.getRow());
356        if (targetRowKey >= 40 && targetRowKey < 60) {
357          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
358          LOG.debug("Target cells: " + Arrays.toString(targetCells));
359          fail("There shouldn't exist any rows between 40 and 60, since "
360            + "Puts are disabled and Deletes are enabled.");
361        } else if (targetRowKey >= 60 && targetRowKey < 70) {
362          if (sourceCells.length == targetCells.length) {
363            LOG.debug("Source cells: " + Arrays.toString(sourceCells));
364            LOG.debug("Target cells: " + Arrays.toString(targetCells));
365            fail(
366              "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same number of cells.");
367          }
368        } else if (targetRowKey >= 80 && targetRowKey < 90) {
369          LOG.debug("Source cells: " + Arrays.toString(sourceCells));
370          LOG.debug("Target cells: " + Arrays.toString(targetCells));
371          fail("There should be no rows between 80 and 90 on target, as "
372            + "these had different timestamps and should had been deleted.");
373        } else if (targetRowKey >= 90 && targetRowKey < 100) {
374          for (int j = 0; j < sourceCells.length; j++) {
375            Cell sourceCell = sourceCells[j];
376            Cell targetCell = targetCells[j];
377            if (CellUtil.matchingValue(sourceCell, targetCell)) {
378              fail("Cells values should not match for rows between " + "90 and 100. Target row id: "
379                + Bytes.toInt(targetRow.getRow()));
380            }
381          }
382        } else {
383          for (int j = 0; j < sourceCells.length; j++) {
384            Cell sourceCell = sourceCells[j];
385            Cell targetCell = targetCells[j];
386            assertCellEquals(sourceCell, targetCell, () -> true);
387          }
388        }
389        rowsCount++;
390        targetRow = targetScanner.next();
391        sourceRow = sourceScanner.next();
392      }
393      assertEquals("Target expected rows does not match.", expectedRows, rowsCount);
394    }
395  }
396
397  private Counters syncTables(Configuration conf, TableName sourceTableName,
398    TableName targetTableName, Path testDir, String... options) throws Exception {
399    SyncTable syncTable = new SyncTable(conf);
400    String[] args = Arrays.copyOf(options, options.length + 3);
401    args[options.length] = testDir.toString();
402    args[options.length + 1] = sourceTableName.getNameAsString();
403    args[options.length + 2] = targetTableName.getNameAsString();
404    int code = syncTable.run(args);
405    assertEquals("sync table job failed", 0, code);
406
407    LOG.info("Sync tables completed");
408    return syncTable.counters;
409  }
410
411  private void hashSourceTable(HBaseTestingUtil sourceCluster, TableName sourceTableName,
412    Path testDir, String... options) throws Exception {
413    int numHashFiles = 3;
414    long batchSize = 100; // should be 2 batches per region
415    int scanBatch = 1;
416    HashTable hashTable = new HashTable(sourceCluster.getConfiguration());
417    String[] args = Arrays.copyOf(options, options.length + 5);
418    args[options.length] = "--batchsize=" + batchSize;
419    args[options.length + 1] = "--numhashfiles=" + numHashFiles;
420    args[options.length + 2] = "--scanbatch=" + scanBatch;
421    args[options.length + 3] = sourceTableName.getNameAsString();
422    args[options.length + 4] = testDir.toString();
423    int code = hashTable.run(args);
424    assertEquals("hash table job failed", 0, code);
425
426    FileSystem fs = sourceCluster.getTestFileSystem();
427
428    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
429    assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
430    assertEquals(batchSize, tableHash.batchSize);
431    assertEquals(numHashFiles, tableHash.numHashFiles);
432    assertEquals(numHashFiles - 1, tableHash.partitions.size());
433
434    LOG.info("Hash table completed");
435  }
436
437  private void writeTestData(HBaseTestingUtil sourceCluster, TableName sourceTableName,
438    HBaseTestingUtil targetCluster, TableName targetTableName, long... timestamps)
439    throws Exception {
440    final byte[] family = Bytes.toBytes("family");
441    final byte[] column1 = Bytes.toBytes("c1");
442    final byte[] column2 = Bytes.toBytes("c2");
443    final byte[] value1 = Bytes.toBytes("val1");
444    final byte[] value2 = Bytes.toBytes("val2");
445    final byte[] value3 = Bytes.toBytes("val3");
446
447    int numRows = 100;
448    int sourceRegions = 10;
449    int targetRegions = 6;
450    if (ArrayUtils.isEmpty(timestamps)) {
451      long current = EnvironmentEdgeManager.currentTime();
452      timestamps = new long[] { current, current };
453    }
454
455    try (
456      Table sourceTable =
457        sourceCluster.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions));
458      Table targetTable = targetCluster.createTable(targetTableName, family,
459        generateSplits(numRows, targetRegions))) {
460
461      int rowIndex = 0;
462      // a bunch of identical rows
463      for (; rowIndex < 40; rowIndex++) {
464        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
465        sourcePut.addColumn(family, column1, timestamps[0], value1);
466        sourcePut.addColumn(family, column2, timestamps[0], value2);
467        sourceTable.put(sourcePut);
468
469        Put targetPut = new Put(Bytes.toBytes(rowIndex));
470        targetPut.addColumn(family, column1, timestamps[1], value1);
471        targetPut.addColumn(family, column2, timestamps[1], value2);
472        targetTable.put(targetPut);
473      }
474      // some rows only in the source table
475      // ROWSWITHDIFFS: 10
476      // TARGETMISSINGROWS: 10
477      // TARGETMISSINGCELLS: 20
478      for (; rowIndex < 50; rowIndex++) {
479        Put put = new Put(Bytes.toBytes(rowIndex));
480        put.addColumn(family, column1, timestamps[0], value1);
481        put.addColumn(family, column2, timestamps[0], value2);
482        sourceTable.put(put);
483      }
484      // some rows only in the target table
485      // ROWSWITHDIFFS: 10
486      // SOURCEMISSINGROWS: 10
487      // SOURCEMISSINGCELLS: 20
488      for (; rowIndex < 60; rowIndex++) {
489        Put put = new Put(Bytes.toBytes(rowIndex));
490        put.addColumn(family, column1, timestamps[1], value1);
491        put.addColumn(family, column2, timestamps[1], value2);
492        targetTable.put(put);
493      }
494      // some rows with 1 missing cell in target table
495      // ROWSWITHDIFFS: 10
496      // TARGETMISSINGCELLS: 10
497      for (; rowIndex < 70; rowIndex++) {
498        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
499        sourcePut.addColumn(family, column1, timestamps[0], value1);
500        sourcePut.addColumn(family, column2, timestamps[0], value2);
501        sourceTable.put(sourcePut);
502
503        Put targetPut = new Put(Bytes.toBytes(rowIndex));
504        targetPut.addColumn(family, column1, timestamps[1], value1);
505        targetTable.put(targetPut);
506      }
507      // some rows with 1 missing cell in source table
508      // ROWSWITHDIFFS: 10
509      // SOURCEMISSINGCELLS: 10
510      for (; rowIndex < 80; rowIndex++) {
511        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
512        sourcePut.addColumn(family, column1, timestamps[0], value1);
513        sourceTable.put(sourcePut);
514
515        Put targetPut = new Put(Bytes.toBytes(rowIndex));
516        targetPut.addColumn(family, column1, timestamps[1], value1);
517        targetPut.addColumn(family, column2, timestamps[1], value2);
518        targetTable.put(targetPut);
519      }
520      // some rows differing only in timestamp
521      // ROWSWITHDIFFS: 10
522      // SOURCEMISSINGCELLS: 20
523      // TARGETMISSINGCELLS: 20
524      for (; rowIndex < 90; rowIndex++) {
525        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
526        sourcePut.addColumn(family, column1, timestamps[0], column1);
527        sourcePut.addColumn(family, column2, timestamps[0], value2);
528        sourceTable.put(sourcePut);
529
530        Put targetPut = new Put(Bytes.toBytes(rowIndex));
531        targetPut.addColumn(family, column1, timestamps[1] + 1, column1);
532        targetPut.addColumn(family, column2, timestamps[1] - 1, value2);
533        targetTable.put(targetPut);
534      }
535      // some rows with different values
536      // ROWSWITHDIFFS: 10
537      // DIFFERENTCELLVALUES: 20
538      for (; rowIndex < numRows; rowIndex++) {
539        Put sourcePut = new Put(Bytes.toBytes(rowIndex));
540        sourcePut.addColumn(family, column1, timestamps[0], value1);
541        sourcePut.addColumn(family, column2, timestamps[0], value2);
542        sourceTable.put(sourcePut);
543
544        Put targetPut = new Put(Bytes.toBytes(rowIndex));
545        targetPut.addColumn(family, column1, timestamps[1], value3);
546        targetPut.addColumn(family, column2, timestamps[1], value3);
547        targetTable.put(targetPut);
548      }
549    }
550  }
551}