001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.Arrays; 023import org.apache.commons.lang3.ArrayUtils; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.ResultScanner; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter; 037import org.apache.hadoop.hbase.testclassification.LargeTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.mapreduce.Counters; 041import org.junit.AfterClass; 042import org.junit.Assert; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.rules.TestName; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 053 054/** 055 * Basic test for the SyncTable M/R tool 056 */ 057@Category(LargeTests.class) 058public class TestSyncTable { 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestSyncTable.class); 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class); 064 065 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 066 067 @Rule 068 public TestName name = new TestName(); 069 070 @BeforeClass 071 public static void beforeClass() throws Exception { 072 TEST_UTIL.startMiniCluster(3); 073 } 074 075 @AfterClass 076 public static void afterClass() throws Exception { 077 TEST_UTIL.cleanupDataTestDirOnTestFS(); 078 TEST_UTIL.shutdownMiniCluster(); 079 } 080 081 private static byte[][] generateSplits(int numRows, int numRegions) { 082 byte[][] splitRows = new byte[numRegions - 1][]; 083 for (int i = 1; i < numRegions; i++) { 084 splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions); 085 } 086 return splitRows; 087 } 088 089 @Test 090 public void testSyncTable() throws Exception { 091 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 092 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 093 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable"); 094 095 writeTestData(sourceTableName, targetTableName); 096 hashSourceTable(sourceTableName, testDir); 097 Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir); 098 assertEqualTables(90, sourceTableName, targetTableName, false); 099 100 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 101 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 102 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 103 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 104 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 105 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 106 107 TEST_UTIL.deleteTable(sourceTableName); 108 TEST_UTIL.deleteTable(targetTableName); 109 } 110 111 @Test 112 public void testSyncTableDoDeletesFalse() throws Exception { 113 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 114 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 115 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse"); 116 117 writeTestData(sourceTableName, targetTableName); 118 hashSourceTable(sourceTableName, testDir); 119 Counters syncCounters = 120 syncTables(sourceTableName, targetTableName, testDir, "--doDeletes=false"); 121 assertTargetDoDeletesFalse(100, sourceTableName, targetTableName); 122 123 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 124 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 125 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 126 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 127 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 128 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 129 130 TEST_UTIL.deleteTable(sourceTableName); 131 TEST_UTIL.deleteTable(targetTableName); 132 } 133 134 @Test 135 public void testSyncTableDoPutsFalse() throws Exception { 136 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 137 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 138 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse"); 139 140 writeTestData(sourceTableName, targetTableName); 141 hashSourceTable(sourceTableName, testDir); 142 Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir, "--doPuts=false"); 143 assertTargetDoPutsFalse(70, sourceTableName, targetTableName); 144 145 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 146 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 147 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 148 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 149 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 150 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 151 152 TEST_UTIL.deleteTable(sourceTableName); 153 TEST_UTIL.deleteTable(targetTableName); 154 } 155 156 @Test 157 public void testSyncTableIgnoreTimestampsTrue() throws Exception { 158 final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); 159 final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); 160 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue"); 161 long current = EnvironmentEdgeManager.currentTime(); 162 writeTestData(sourceTableName, targetTableName, current - 1000, current); 163 hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true"); 164 Counters syncCounters = 165 syncTables(sourceTableName, targetTableName, testDir, "--ignoreTimestamps=true"); 166 assertEqualTables(90, sourceTableName, targetTableName, true); 167 168 assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); 169 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); 170 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); 171 assertEquals(30, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); 172 assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); 173 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); 174 175 TEST_UTIL.deleteTable(sourceTableName); 176 TEST_UTIL.deleteTable(targetTableName); 177 } 178 179 private void assertEqualTables(int expectedRows, TableName sourceTableName, 180 TableName targetTableName, boolean ignoreTimestamps) throws Exception { 181 Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); 182 Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); 183 184 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 185 ResultScanner targetScanner = targetTable.getScanner(new Scan()); 186 187 for (int i = 0; i < expectedRows; i++) { 188 Result sourceRow = sourceScanner.next(); 189 Result targetRow = targetScanner.next(); 190 191 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 192 + " cells:" + sourceRow); 193 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 194 + " cells:" + targetRow); 195 196 if (sourceRow == null) { 197 Assert.fail("Expected " + expectedRows + " source rows but only found " + i); 198 } 199 if (targetRow == null) { 200 Assert.fail("Expected " + expectedRows + " target rows but only found " + i); 201 } 202 Cell[] sourceCells = sourceRow.rawCells(); 203 Cell[] targetCells = targetRow.rawCells(); 204 if (sourceCells.length != targetCells.length) { 205 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 206 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 207 Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length 208 + " cells in source table but " + targetCells.length + " cells in target table"); 209 } 210 for (int j = 0; j < sourceCells.length; j++) { 211 Cell sourceCell = sourceCells[j]; 212 Cell targetCell = targetCells[j]; 213 try { 214 if (!CellUtil.matchingRows(sourceCell, targetCell)) { 215 Assert.fail("Rows don't match"); 216 } 217 if (!CellUtil.matchingFamily(sourceCell, targetCell)) { 218 Assert.fail("Families don't match"); 219 } 220 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { 221 Assert.fail("Qualifiers don't match"); 222 } 223 if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, targetCell)) { 224 Assert.fail("Timestamps don't match"); 225 } 226 if (!CellUtil.matchingValue(sourceCell, targetCell)) { 227 Assert.fail("Values don't match"); 228 } 229 } catch (Throwable t) { 230 LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); 231 Throwables.propagate(t); 232 } 233 } 234 } 235 Result sourceRow = sourceScanner.next(); 236 if (sourceRow != null) { 237 Assert.fail("Source table has more than " + expectedRows + " rows. Next row: " 238 + Bytes.toInt(sourceRow.getRow())); 239 } 240 Result targetRow = targetScanner.next(); 241 if (targetRow != null) { 242 Assert.fail("Target table has more than " + expectedRows + " rows. Next row: " 243 + Bytes.toInt(targetRow.getRow())); 244 } 245 sourceScanner.close(); 246 targetScanner.close(); 247 sourceTable.close(); 248 targetTable.close(); 249 } 250 251 private void assertTargetDoDeletesFalse(int expectedRows, TableName sourceTableName, 252 TableName targetTableName) throws Exception { 253 Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); 254 Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); 255 256 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 257 ResultScanner targetScanner = targetTable.getScanner(new Scan()); 258 Result targetRow = targetScanner.next(); 259 Result sourceRow = sourceScanner.next(); 260 int rowsCount = 0; 261 while (targetRow != null) { 262 rowsCount++; 263 // only compares values for existing rows, skipping rows existing on 264 // target only that were not deleted given --doDeletes=false 265 if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { 266 targetRow = targetScanner.next(); 267 continue; 268 } 269 270 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 271 + " cells:" + sourceRow); 272 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 273 + " cells:" + targetRow); 274 275 Cell[] sourceCells = sourceRow.rawCells(); 276 Cell[] targetCells = targetRow.rawCells(); 277 int targetRowKey = Bytes.toInt(targetRow.getRow()); 278 if (targetRowKey >= 70 && targetRowKey < 80) { 279 if (sourceCells.length == targetCells.length) { 280 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 281 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 282 Assert 283 .fail("Row " + targetRowKey + " should have more cells in " + "target than in source"); 284 } 285 286 } else { 287 if (sourceCells.length != targetCells.length) { 288 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 289 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 290 Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length 291 + " cells in source table but " + targetCells.length + " cells in target table"); 292 } 293 } 294 for (int j = 0; j < sourceCells.length; j++) { 295 Cell sourceCell = sourceCells[j]; 296 Cell targetCell = targetCells[j]; 297 try { 298 if (!CellUtil.matchingRow(sourceCell, targetCell)) { 299 Assert.fail("Rows don't match"); 300 } 301 if (!CellUtil.matchingFamily(sourceCell, targetCell)) { 302 Assert.fail("Families don't match"); 303 } 304 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { 305 Assert.fail("Qualifiers don't match"); 306 } 307 if (targetRowKey < 80 && targetRowKey >= 90) { 308 if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { 309 Assert.fail("Timestamps don't match"); 310 } 311 } 312 if (!CellUtil.matchingValue(sourceCell, targetCell)) { 313 Assert.fail("Values don't match"); 314 } 315 } catch (Throwable t) { 316 LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); 317 Throwables.propagate(t); 318 } 319 } 320 targetRow = targetScanner.next(); 321 sourceRow = sourceScanner.next(); 322 } 323 assertEquals("Target expected rows does not match.", expectedRows, rowsCount); 324 sourceScanner.close(); 325 targetScanner.close(); 326 sourceTable.close(); 327 targetTable.close(); 328 } 329 330 private void assertTargetDoPutsFalse(int expectedRows, TableName sourceTableName, 331 TableName targetTableName) throws Exception { 332 Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); 333 Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); 334 335 ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); 336 ResultScanner targetScanner = targetTable.getScanner(new Scan()); 337 Result targetRow = targetScanner.next(); 338 Result sourceRow = sourceScanner.next(); 339 int rowsCount = 0; 340 341 while (targetRow != null) { 342 // only compares values for existing rows, skipping rows existing on 343 // source only that were not added to target given --doPuts=false 344 if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { 345 sourceRow = sourceScanner.next(); 346 continue; 347 } 348 349 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) 350 + " cells:" + sourceRow); 351 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) 352 + " cells:" + targetRow); 353 354 LOG.debug("rowsCount: " + rowsCount); 355 356 Cell[] sourceCells = sourceRow.rawCells(); 357 Cell[] targetCells = targetRow.rawCells(); 358 int targetRowKey = Bytes.toInt(targetRow.getRow()); 359 if (targetRowKey >= 40 && targetRowKey < 60) { 360 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 361 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 362 Assert.fail("There shouldn't exist any rows between 40 and 60, since " 363 + "Puts are disabled and Deletes are enabled."); 364 } else if (targetRowKey >= 60 && targetRowKey < 70) { 365 if (sourceCells.length == targetCells.length) { 366 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 367 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 368 Assert.fail( 369 "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same number of cells."); 370 } 371 } else if (targetRowKey >= 80 && targetRowKey < 90) { 372 LOG.debug("Source cells: " + Arrays.toString(sourceCells)); 373 LOG.debug("Target cells: " + Arrays.toString(targetCells)); 374 Assert.fail("There should be no rows between 80 and 90 on target, as " 375 + "these had different timestamps and should had been deleted."); 376 } else if (targetRowKey >= 90 && targetRowKey < 100) { 377 for (int j = 0; j < sourceCells.length; j++) { 378 Cell sourceCell = sourceCells[j]; 379 Cell targetCell = targetCells[j]; 380 if (CellUtil.matchingValue(sourceCell, targetCell)) { 381 Assert.fail("Cells values should not match for rows between " 382 + "90 and 100. Target row id: " + (Bytes.toInt(targetRow.getRow()))); 383 } 384 } 385 } else { 386 for (int j = 0; j < sourceCells.length; j++) { 387 Cell sourceCell = sourceCells[j]; 388 Cell targetCell = targetCells[j]; 389 try { 390 if (!CellUtil.matchingRow(sourceCell, targetCell)) { 391 Assert.fail("Rows don't match"); 392 } 393 if (!CellUtil.matchingFamily(sourceCell, targetCell)) { 394 Assert.fail("Families don't match"); 395 } 396 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { 397 Assert.fail("Qualifiers don't match"); 398 } 399 if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { 400 Assert.fail("Timestamps don't match"); 401 } 402 if (!CellUtil.matchingValue(sourceCell, targetCell)) { 403 Assert.fail("Values don't match"); 404 } 405 } catch (Throwable t) { 406 LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); 407 Throwables.propagate(t); 408 } 409 } 410 } 411 rowsCount++; 412 targetRow = targetScanner.next(); 413 sourceRow = sourceScanner.next(); 414 } 415 assertEquals("Target expected rows does not match.", expectedRows, rowsCount); 416 sourceScanner.close(); 417 targetScanner.close(); 418 sourceTable.close(); 419 targetTable.close(); 420 } 421 422 private Counters syncTables(TableName sourceTableName, TableName targetTableName, Path testDir, 423 String... options) throws Exception { 424 SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration()); 425 String[] args = Arrays.copyOf(options, options.length + 3); 426 args[options.length] = testDir.toString(); 427 args[options.length + 1] = sourceTableName.getNameAsString(); 428 args[options.length + 2] = targetTableName.getNameAsString(); 429 int code = syncTable.run(args); 430 assertEquals("sync table job failed", 0, code); 431 432 LOG.info("Sync tables completed"); 433 return syncTable.counters; 434 } 435 436 private void hashSourceTable(TableName sourceTableName, Path testDir, String... options) 437 throws Exception { 438 int numHashFiles = 3; 439 long batchSize = 100; // should be 2 batches per region 440 int scanBatch = 1; 441 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); 442 String[] args = Arrays.copyOf(options, options.length + 5); 443 args[options.length] = "--batchsize=" + batchSize; 444 args[options.length + 1] = "--numhashfiles=" + numHashFiles; 445 args[options.length + 2] = "--scanbatch=" + scanBatch; 446 args[options.length + 3] = sourceTableName.getNameAsString(); 447 args[options.length + 4] = testDir.toString(); 448 int code = hashTable.run(args); 449 assertEquals("hash table job failed", 0, code); 450 451 FileSystem fs = TEST_UTIL.getTestFileSystem(); 452 453 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); 454 assertEquals(sourceTableName.getNameAsString(), tableHash.tableName); 455 assertEquals(batchSize, tableHash.batchSize); 456 assertEquals(numHashFiles, tableHash.numHashFiles); 457 assertEquals(numHashFiles - 1, tableHash.partitions.size()); 458 459 LOG.info("Hash table completed"); 460 } 461 462 private void writeTestData(TableName sourceTableName, TableName targetTableName, 463 long... timestamps) throws Exception { 464 final byte[] family = Bytes.toBytes("family"); 465 final byte[] column1 = Bytes.toBytes("c1"); 466 final byte[] column2 = Bytes.toBytes("c2"); 467 final byte[] value1 = Bytes.toBytes("val1"); 468 final byte[] value2 = Bytes.toBytes("val2"); 469 final byte[] value3 = Bytes.toBytes("val3"); 470 471 int numRows = 100; 472 int sourceRegions = 10; 473 int targetRegions = 6; 474 if (ArrayUtils.isEmpty(timestamps)) { 475 long current = EnvironmentEdgeManager.currentTime(); 476 timestamps = new long[] { current, current }; 477 } 478 479 Table sourceTable = 480 TEST_UTIL.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions)); 481 482 Table targetTable = 483 TEST_UTIL.createTable(targetTableName, family, generateSplits(numRows, targetRegions)); 484 485 int rowIndex = 0; 486 // a bunch of identical rows 487 for (; rowIndex < 40; rowIndex++) { 488 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 489 sourcePut.addColumn(family, column1, timestamps[0], value1); 490 sourcePut.addColumn(family, column2, timestamps[0], value2); 491 sourceTable.put(sourcePut); 492 493 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 494 targetPut.addColumn(family, column1, timestamps[1], value1); 495 targetPut.addColumn(family, column2, timestamps[1], value2); 496 targetTable.put(targetPut); 497 } 498 // some rows only in the source table 499 // ROWSWITHDIFFS: 10 500 // TARGETMISSINGROWS: 10 501 // TARGETMISSINGCELLS: 20 502 for (; rowIndex < 50; rowIndex++) { 503 Put put = new Put(Bytes.toBytes(rowIndex)); 504 put.addColumn(family, column1, timestamps[0], value1); 505 put.addColumn(family, column2, timestamps[0], value2); 506 sourceTable.put(put); 507 } 508 // some rows only in the target table 509 // ROWSWITHDIFFS: 10 510 // SOURCEMISSINGROWS: 10 511 // SOURCEMISSINGCELLS: 20 512 for (; rowIndex < 60; rowIndex++) { 513 Put put = new Put(Bytes.toBytes(rowIndex)); 514 put.addColumn(family, column1, timestamps[1], value1); 515 put.addColumn(family, column2, timestamps[1], value2); 516 targetTable.put(put); 517 } 518 // some rows with 1 missing cell in target table 519 // ROWSWITHDIFFS: 10 520 // TARGETMISSINGCELLS: 10 521 for (; rowIndex < 70; rowIndex++) { 522 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 523 sourcePut.addColumn(family, column1, timestamps[0], value1); 524 sourcePut.addColumn(family, column2, timestamps[0], value2); 525 sourceTable.put(sourcePut); 526 527 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 528 targetPut.addColumn(family, column1, timestamps[1], value1); 529 targetTable.put(targetPut); 530 } 531 // some rows with 1 missing cell in source table 532 // ROWSWITHDIFFS: 10 533 // SOURCEMISSINGCELLS: 10 534 for (; rowIndex < 80; rowIndex++) { 535 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 536 sourcePut.addColumn(family, column1, timestamps[0], value1); 537 sourceTable.put(sourcePut); 538 539 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 540 targetPut.addColumn(family, column1, timestamps[1], value1); 541 targetPut.addColumn(family, column2, timestamps[1], value2); 542 targetTable.put(targetPut); 543 } 544 // some rows differing only in timestamp 545 // ROWSWITHDIFFS: 10 546 // SOURCEMISSINGCELLS: 20 547 // TARGETMISSINGCELLS: 20 548 for (; rowIndex < 90; rowIndex++) { 549 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 550 sourcePut.addColumn(family, column1, timestamps[0], column1); 551 sourcePut.addColumn(family, column2, timestamps[0], value2); 552 sourceTable.put(sourcePut); 553 554 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 555 targetPut.addColumn(family, column1, timestamps[1] + 1, column1); 556 targetPut.addColumn(family, column2, timestamps[1] - 1, value2); 557 targetTable.put(targetPut); 558 } 559 // some rows with different values 560 // ROWSWITHDIFFS: 10 561 // DIFFERENTCELLVALUES: 20 562 for (; rowIndex < numRows; rowIndex++) { 563 Put sourcePut = new Put(Bytes.toBytes(rowIndex)); 564 sourcePut.addColumn(family, column1, timestamps[0], value1); 565 sourcePut.addColumn(family, column2, timestamps[0], value2); 566 sourceTable.put(sourcePut); 567 568 Put targetPut = new Put(Bytes.toBytes(rowIndex)); 569 targetPut.addColumn(family, column1, timestamps[1], value3); 570 targetPut.addColumn(family, column2, timestamps[1], value3); 571 targetTable.put(targetPut); 572 } 573 574 sourceTable.close(); 575 targetTable.close(); 576 } 577}