001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.util.Arrays; 025import java.util.function.BooleanSupplier; 026import org.apache.commons.lang3.ArrayUtils; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.testclassification.MapReduceTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.mapreduce.Counters; 046import org.junit.AfterClass; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Rule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.rules.TestName; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Basic test for the SyncTable M/R tool 058 */ 059@Category({ MapReduceTests.class, LargeTests.class }) 060public class TestSyncTable { 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestSyncTable.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class); 066 067 private static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); 068 069 private static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); 070 071 @Rule 072 public TestName name = new TestName(); 073 074 @BeforeClass 075 public static void beforeClass() throws Exception { 076 UTIL1.startMiniCluster(3); 077 UTIL2.startMiniCluster(3); 078 } 079 080 @AfterClass 081 public static void afterClass() throws Exception { 082 UTIL2.shutdownMiniCluster(); 083 UTIL1.shutdownMiniCluster(); 084 } 085 086 private static byte[][] generateSplits(int numRows, int numRegions) { 087 byte[][] splitRows = new byte[numRegions - 1][]; 088 for (int i = 1; i < numRegions; i++) { 089 splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions); 090 } 091 return splitRows; 092 } 093 094 private void testSyncTable(HBaseTestingUtil source, HBaseTestingUtil target, String... options) 095 throws Exception { 096 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 097 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 098 Path testDir = source.getDataTestDirOnTestFS(name.getMethodName()); 099 100 writeTestData(source, sourceTableName, target, targetTableName); 101 hashSourceTable(source, sourceTableName, testDir); 102 Counters syncCounters = 103 syncTables(target.getConfiguration(), sourceTableName, targetTableName, testDir, options); 104 assertEqualTables(90, source, sourceTableName, target, targetTableName, false); 105 106 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 107 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 108 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 109 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 110 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 111 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 112 113 source.deleteTable(sourceTableName); 114 target.deleteTable(targetTableName); 115 } 116 117 @Test 118 public void testSyncTable() throws Exception { 119 testSyncTable(UTIL1, UTIL1); 120 } 121 122 @Test 123 public void testSyncTableToPeerCluster() throws Exception { 124 testSyncTable(UTIL1, UTIL2, "--sourceuri=" + UTIL1.getRpcConnnectionURI()); 125 } 126 127 @Test 128 public void testSyncTableFromSourceToPeerCluster() throws Exception { 129 testSyncTable(UTIL2, UTIL1, "--sourceuri=" + UTIL2.getRpcConnnectionURI(), 130 "--targeturi=" + UTIL1.getZkConnectionURI()); 131 } 132 133 @Test 134 public void testSyncTableFromSourceToPeerClusterWithClusterKey() throws Exception { 135 testSyncTable(UTIL2, UTIL1, "--sourcezkcluster=" + UTIL2.getClusterKey(), 136 "--targetzkcluster=" + UTIL1.getClusterKey()); 137 } 138 139 @Test 140 public void testSyncTableDoDeletesFalse() throws Exception { 141 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 142 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 143 Path testDir = UTIL1.getDataTestDirOnTestFS(name.getMethodName()); 144 145 writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName); 146 hashSourceTable(UTIL1, sourceTableName, testDir); 147 Counters syncCounters = syncTables(UTIL1.getConfiguration(), sourceTableName, targetTableName, 148 testDir, "--doDeletes=false"); 149 assertTargetDoDeletesFalse(100, UTIL1, sourceTableName, UTIL1, targetTableName); 150 151 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 152 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 153 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 154 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 155 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 156 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 157 158 UTIL1.deleteTable(sourceTableName); 159 UTIL1.deleteTable(targetTableName); 160 } 161 162 @Test 163 public void testSyncTableDoPutsFalse() throws Exception { 164 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 165 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 166 Path testDir = UTIL2.getDataTestDirOnTestFS(name.getMethodName()); 167 168 writeTestData(UTIL2, sourceTableName, UTIL2, targetTableName); 169 hashSourceTable(UTIL2, sourceTableName, testDir); 170 Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName, 171 testDir, "--doPuts=false"); 172 assertTargetDoPutsFalse(70, UTIL2, sourceTableName, UTIL2, targetTableName); 173 174 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 175 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 176 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 177 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 178 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 179 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 180 181 UTIL2.deleteTable(sourceTableName); 182 UTIL2.deleteTable(targetTableName); 183 } 184 185 @Test 186 public void testSyncTableIgnoreTimestampsTrue() throws Exception { 187 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 188 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 189 Path testDir = UTIL1.getDataTestDirOnTestFS(name.getMethodName()); 190 long current = EnvironmentEdgeManager.currentTime(); 191 writeTestData(UTIL1, sourceTableName, UTIL2, targetTableName, current - 1000, current); 192 hashSourceTable(UTIL1, sourceTableName, testDir, "--ignoreTimestamps=true"); 193 Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName, 194 testDir, "--ignoreTimestamps=true", "--sourceuri=" + UTIL1.getRpcConnnectionURI()); 195 assertEqualTables(90, UTIL1, sourceTableName, UTIL2, targetTableName, true); 196 197 assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 198 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 199 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 200 assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 201 assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 202 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 203 204 UTIL1.deleteTable(sourceTableName); 205 UTIL2.deleteTable(targetTableName); 206 } 207 208 private void assertCellEquals(Cell sourceCell, Cell targetCell, BooleanSupplier checkTimestamp) { 209 assertTrue("Rows don't match, source: " + sourceCell + ", target: " + targetCell, 210 CellUtil.matchingRows(sourceCell, targetCell)); 211 assertTrue("Families don't match, source: " + sourceCell + ", target: " + targetCell, 212 CellUtil.matchingFamily(sourceCell, targetCell)); 213 assertTrue("Qualifiers don't match, source: " + sourceCell + ", target: " + targetCell, 214 CellUtil.matchingQualifier(sourceCell, targetCell)); 215 if (checkTimestamp.getAsBoolean()) { 216 assertTrue("Timestamps don't match, source: " + sourceCell + ", target: " + targetCell, 217 CellUtil.matchingTimestamp(sourceCell, targetCell)); 218 } 219 assertTrue("Values don't match, source: " + sourceCell + ", target: " + targetCell, 220 CellUtil.matchingValue(sourceCell, targetCell)); 221 } 222 223 private void assertEqualTables(int expectedRows, HBaseTestingUtil sourceCluster, 224 TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName, 225 boolean ignoreTimestamps) throws Exception { 226 try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); 227 Table targetTable = targetCluster.getConnection().getTable(targetTableName); 228 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 229 ResultScanner targetScanner = targetTable.getScanner(new Scan())) { 230 for (int i = 0; i < expectedRows; i++) { 231 Result sourceRow = sourceScanner.next(); 232 Result targetRow = targetScanner.next(); 233 234 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 235 + " cells:" + sourceRow); 236 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 237 + " cells:" + targetRow); 238 239 if (sourceRow == null) { 240 fail("Expected " + expectedRows + " source rows but only found " + i); 241 } 242 if (targetRow == null) { 243 fail("Expected " + expectedRows + " target rows but only found " + i); 244 } 245 Cell[] sourceCells = sourceRow.rawCells(); 246 Cell[] targetCells = targetRow.rawCells(); 247 if (sourceCells.length != targetCells.length) { 248 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 249 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 250 fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length 251 + " cells in source table but " + targetCells.length + " cells in target table"); 252 } 253 for (int j = 0; j < sourceCells.length; j++) { 254 Cell sourceCell = sourceCells[j]; 255 Cell targetCell = targetCells[j]; 256 assertCellEquals(sourceCell, targetCell, () -> !ignoreTimestamps); 257 } 258 } 259 Result sourceRow = sourceScanner.next(); 260 if (sourceRow != null) { 261 fail("Source table has more than " + expectedRows + " rows. Next row: " 262 + Bytes.toInt(sourceRow.getRow())); 263 } 264 Result targetRow = targetScanner.next(); 265 if (targetRow != null) { 266 fail("Target table has more than " + expectedRows + " rows. Next row: " 267 + Bytes.toInt(targetRow.getRow())); 268 } 269 } 270 } 271 272 private void assertTargetDoDeletesFalse(int expectedRows, HBaseTestingUtil sourceCluster, 273 TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName) 274 throws Exception { 275 try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); 276 Table targetTable = targetCluster.getConnection().getTable(targetTableName); 277 278 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 279 ResultScanner targetScanner = targetTable.getScanner(new Scan())) { 280 Result targetRow = targetScanner.next(); 281 Result sourceRow = sourceScanner.next(); 282 int rowsCount = 0; 283 while (targetRow != null) { 284 rowsCount++; 285 // only compares values for existing rows, skipping rows existing on 286 // target only that were not deleted given --doDeletes=false 287 if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { 288 targetRow = targetScanner.next(); 289 continue; 290 } 291 292 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 293 + " cells:" + sourceRow); 294 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 295 + " cells:" + targetRow); 296 297 Cell[] sourceCells = sourceRow.rawCells(); 298 Cell[] targetCells = targetRow.rawCells(); 299 int targetRowKey = Bytes.toInt(targetRow.getRow()); 300 if (targetRowKey >= 70 && targetRowKey < 80) { 301 if (sourceCells.length == targetCells.length) { 302 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 303 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 304 fail("Row " + targetRowKey + " should have more cells in " + "target than in source"); 305 } 306 307 } else { 308 if (sourceCells.length != targetCells.length) { 309 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 310 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 311 fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length 312 + " cells in source table but " + targetCells.length + " cells in target table"); 313 } 314 } 315 for (int j = 0; j < sourceCells.length; j++) { 316 Cell sourceCell = sourceCells[j]; 317 Cell targetCell = targetCells[j]; 318 assertCellEquals(sourceCell, targetCell, () -> targetRowKey < 80 && targetRowKey >= 90); 319 } 320 targetRow = targetScanner.next(); 321 sourceRow = sourceScanner.next(); 322 } 323 assertEquals("Target expected rows does not match.", expectedRows, rowsCount); 324 } 325 } 326 327 private void assertTargetDoPutsFalse(int expectedRows, HBaseTestingUtil sourceCluster, 328 TableName sourceTableName, HBaseTestingUtil targetCluster, TableName targetTableName) 329 throws Exception { 330 try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); 331 Table targetTable = targetCluster.getConnection().getTable(targetTableName); 332 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 333 ResultScanner targetScanner = targetTable.getScanner(new Scan())) { 334 Result targetRow = targetScanner.next(); 335 Result sourceRow = sourceScanner.next(); 336 int rowsCount = 0; 337 338 while (targetRow != null) { 339 // only compares values for existing rows, skipping rows existing on 340 // source only that were not added to target given --doPuts=false 341 if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { 342 sourceRow = sourceScanner.next(); 343 continue; 344 } 345 346 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 347 + " cells:" + sourceRow); 348 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 349 + " cells:" + targetRow); 350 351 LOG.debug("rowsCount: " + rowsCount); 352 353 Cell[] sourceCells = sourceRow.rawCells(); 354 Cell[] targetCells = targetRow.rawCells(); 355 int targetRowKey = Bytes.toInt(targetRow.getRow()); 356 if (targetRowKey >= 40 && targetRowKey < 60) { 357 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 358 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 359 fail("There shouldn't exist any rows between 40 and 60, since " 360 + "Puts are disabled and Deletes are enabled."); 361 } else if (targetRowKey >= 60 && targetRowKey < 70) { 362 if (sourceCells.length == targetCells.length) { 363 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 364 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 365 fail( 366 "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same number of cells."); 367 } 368 } else if (targetRowKey >= 80 && targetRowKey < 90) { 369 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 370 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 371 fail("There should be no rows between 80 and 90 on target, as " 372 + "these had different timestamps and should had been deleted."); 373 } else if (targetRowKey >= 90 && targetRowKey < 100) { 374 for (int j = 0; j < sourceCells.length; j++) { 375 Cell sourceCell = sourceCells[j]; 376 Cell targetCell = targetCells[j]; 377 if (CellUtil.matchingValue(sourceCell, targetCell)) { 378 fail("Cells values should not match for rows between " + "90 and 100. Target row id: " 379 + Bytes.toInt(targetRow.getRow())); 380 } 381 } 382 } else { 383 for (int j = 0; j < sourceCells.length; j++) { 384 Cell sourceCell = sourceCells[j]; 385 Cell targetCell = targetCells[j]; 386 assertCellEquals(sourceCell, targetCell, () -> true); 387 } 388 } 389 rowsCount++; 390 targetRow = targetScanner.next(); 391 sourceRow = sourceScanner.next(); 392 } 393 assertEquals("Target expected rows does not match.", expectedRows, rowsCount); 394 } 395 } 396 397 private Counters syncTables(Configuration conf, TableName sourceTableName, 398 TableName targetTableName, Path testDir, String... options) throws Exception { 399 SyncTable syncTable = new SyncTable(conf); 400 String[] args = Arrays.copyOf(options, options.length + 3); 401 args[options.length] = testDir.toString(); 402 args[options.length + 1] = sourceTableName.getNameAsString(); 403 args[options.length + 2] = targetTableName.getNameAsString(); 404 int code = syncTable.run(args); 405 assertEquals("sync table job failed", 0, code); 406 407 LOG.info("Sync tables completed"); 408 return syncTable.counters; 409 } 410 411 private void hashSourceTable(HBaseTestingUtil sourceCluster, TableName sourceTableName, 412 Path testDir, String... options) throws Exception { 413 int numHashFiles = 3; 414 long batchSize = 100; // should be 2 batches per region 415 int scanBatch = 1; 416 HashTable hashTable = new HashTable(sourceCluster.getConfiguration()); 417 String[] args = Arrays.copyOf(options, options.length + 5); 418 args[options.length] = "--batchsize=" + batchSize; 419 args[options.length + 1] = "--numhashfiles=" + numHashFiles; 420 args[options.length + 2] = "--scanbatch=" + scanBatch; 421 args[options.length + 3] = sourceTableName.getNameAsString(); 422 args[options.length + 4] = testDir.toString(); 423 int code = hashTable.run(args); 424 assertEquals("hash table job failed", 0, code); 425 426 FileSystem fs = sourceCluster.getTestFileSystem(); 427 428 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 429 assertEquals(sourceTableName.getNameAsString(), tableHash.tableName); 430 assertEquals(batchSize, tableHash.batchSize); 431 assertEquals(numHashFiles, tableHash.numHashFiles); 432 assertEquals(numHashFiles - 1, tableHash.partitions.size()); 433 434 LOG.info("Hash table completed"); 435 } 436 437 private void writeTestData(HBaseTestingUtil sourceCluster, TableName sourceTableName, 438 HBaseTestingUtil targetCluster, TableName targetTableName, long... timestamps) 439 throws Exception { 440 final byte[] family = Bytes.toBytes("family"); 441 final byte[] column1 = Bytes.toBytes("c1"); 442 final byte[] column2 = Bytes.toBytes("c2"); 443 final byte[] value1 = Bytes.toBytes("val1"); 444 final byte[] value2 = Bytes.toBytes("val2"); 445 final byte[] value3 = Bytes.toBytes("val3"); 446 447 int numRows = 100; 448 int sourceRegions = 10; 449 int targetRegions = 6; 450 if (ArrayUtils.isEmpty(timestamps)) { 451 long current = EnvironmentEdgeManager.currentTime(); 452 timestamps = new long[] { current, current }; 453 } 454 455 try ( 456 Table sourceTable = 457 sourceCluster.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions)); 458 Table targetTable = targetCluster.createTable(targetTableName, family, 459 generateSplits(numRows, targetRegions))) { 460 461 int rowIndex = 0; 462 // a bunch of identical rows 463 for (; rowIndex < 40; rowIndex++) { 464 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 465 sourcePut.addColumn(family, column1, timestamps[0], value1); 466 sourcePut.addColumn(family, column2, timestamps[0], value2); 467 sourceTable.put(sourcePut); 468 469 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 470 targetPut.addColumn(family, column1, timestamps[1], value1); 471 targetPut.addColumn(family, column2, timestamps[1], value2); 472 targetTable.put(targetPut); 473 } 474 // some rows only in the source table 475 // ROWSWITHDIFFS: 10 476 // TARGETMISSINGROWS: 10 477 // TARGETMISSINGCELLS: 20 478 for (; rowIndex < 50; rowIndex++) { 479 Put put = new Put(Bytes.toBytes(rowIndex)); 480 put.addColumn(family, column1, timestamps[0], value1); 481 put.addColumn(family, column2, timestamps[0], value2); 482 sourceTable.put(put); 483 } 484 // some rows only in the target table 485 // ROWSWITHDIFFS: 10 486 // SOURCEMISSINGROWS: 10 487 // SOURCEMISSINGCELLS: 20 488 for (; rowIndex < 60; rowIndex++) { 489 Put put = new Put(Bytes.toBytes(rowIndex)); 490 put.addColumn(family, column1, timestamps[1], value1); 491 put.addColumn(family, column2, timestamps[1], value2); 492 targetTable.put(put); 493 } 494 // some rows with 1 missing cell in target table 495 // ROWSWITHDIFFS: 10 496 // TARGETMISSINGCELLS: 10 497 for (; rowIndex < 70; rowIndex++) { 498 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 499 sourcePut.addColumn(family, column1, timestamps[0], value1); 500 sourcePut.addColumn(family, column2, timestamps[0], value2); 501 sourceTable.put(sourcePut); 502 503 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 504 targetPut.addColumn(family, column1, timestamps[1], value1); 505 targetTable.put(targetPut); 506 } 507 // some rows with 1 missing cell in source table 508 // ROWSWITHDIFFS: 10 509 // SOURCEMISSINGCELLS: 10 510 for (; rowIndex < 80; rowIndex++) { 511 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 512 sourcePut.addColumn(family, column1, timestamps[0], value1); 513 sourceTable.put(sourcePut); 514 515 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 516 targetPut.addColumn(family, column1, timestamps[1], value1); 517 targetPut.addColumn(family, column2, timestamps[1], value2); 518 targetTable.put(targetPut); 519 } 520 // some rows differing only in timestamp 521 // ROWSWITHDIFFS: 10 522 // SOURCEMISSINGCELLS: 20 523 // TARGETMISSINGCELLS: 20 524 for (; rowIndex < 90; rowIndex++) { 525 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 526 sourcePut.addColumn(family, column1, timestamps[0], column1); 527 sourcePut.addColumn(family, column2, timestamps[0], value2); 528 sourceTable.put(sourcePut); 529 530 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 531 targetPut.addColumn(family, column1, timestamps[1] + 1, column1); 532 targetPut.addColumn(family, column2, timestamps[1] - 1, value2); 533 targetTable.put(targetPut); 534 } 535 // some rows with different values 536 // ROWSWITHDIFFS: 10 537 // DIFFERENTCELLVALUES: 20 538 for (; rowIndex < numRows; rowIndex++) { 539 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 540 sourcePut.addColumn(family, column1, timestamps[0], value1); 541 sourcePut.addColumn(family, column2, timestamps[0], value2); 542 sourceTable.put(sourcePut); 543 544 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 545 targetPut.addColumn(family, column1, timestamps[1], value3); 546 targetPut.addColumn(family, column2, timestamps[1], value3); 547 targetTable.put(targetPut); 548 } 549 } 550 } 551}