001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.CoprocessorEnvironment; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.Waiter; 038import org.apache.hadoop.hbase.codec.KeyValueCodec; 039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 040import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 041import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 042import org.apache.hadoop.hbase.coprocessor.MasterObserver; 043import org.apache.hadoop.hbase.coprocessor.ObserverContext; 044import org.apache.hadoop.hbase.master.RegionPlan; 045import org.apache.hadoop.hbase.testclassification.FlakeyTests; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.JVMClusterUtil; 049import org.junit.AfterClass; 050import org.junit.Assert; 051import org.junit.Before; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Category({ MediumTests.class, FlakeyTests.class }) 060public class TestMultiParallel { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestMultiParallel.class); 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class); 067 068 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 069 private static final byte[] VALUE = Bytes.toBytes("value"); 070 private static final byte[] QUALIFIER = Bytes.toBytes("qual"); 071 private static final String FAMILY = "family"; 072 private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); 073 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); 074 private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); 075 private static final byte[][] KEYS = makeKeys(); 076 077 private static final int slaves = 5; // also used for testing HTable pool size 078 private static Connection CONNECTION; 079 080 @BeforeClass 081 public static void beforeClass() throws Exception { 082 // Uncomment the following lines if more verbosity is needed for 083 // debugging (see HBASE-12285 for details). 084 // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); 085 // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); 086 // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); 087 UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 088 KeyValueCodec.class.getCanonicalName()); 089 // Disable table on master for now as the feature is broken 090 // UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); 091 // We used to ask for system tables on Master exclusively but not needed by test and doesn't 092 // work anyways -- so commented out. 093 // UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); 094 UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 095 MyMasterObserver.class.getName()); 096 UTIL.startMiniCluster(slaves); 097 Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY)); 098 UTIL.waitTableEnabled(TEST_TABLE); 099 t.close(); 100 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); 101 assertTrue(MyMasterObserver.start.get()); 102 } 103 104 @AfterClass 105 public static void afterClass() throws Exception { 106 CONNECTION.close(); 107 UTIL.shutdownMiniCluster(); 108 } 109 110 @Before 111 public void before() throws Exception { 112 final int balanceCount = MyMasterObserver.postBalanceCount.get(); 113 LOG.info("before"); 114 if (UTIL.ensureSomeRegionServersAvailable(slaves)) { 115 // Distribute regions 116 UTIL.getMiniHBaseCluster().getMaster().balance(); 117 // Some plans are created. 118 if (MyMasterObserver.postBalanceCount.get() > balanceCount) { 119 // It is necessary to wait the move procedure to start. 120 // Otherwise, the next wait may pass immediately. 121 UTIL.waitFor(3 * 1000, 100, false, () -> UTIL.getMiniHBaseCluster().getMaster() 122 .getAssignmentManager().hasRegionsInTransition()); 123 } 124 125 // Wait until completing balance 126 UTIL.waitUntilAllRegionsAssigned(TEST_TABLE); 127 } 128 LOG.info("before done"); 129 } 130 131 private static byte[][] makeKeys() { 132 byte[][] starterKeys = HBaseTestingUtil.KEYS; 133 // Create a "non-uniform" test set with the following characteristics: 134 // a) Unequal number of keys per region 135 136 // Don't use integer as a multiple, so that we have a number of keys that is 137 // not a multiple of the number of regions 138 int numKeys = (int) (starterKeys.length * 10.33F); 139 140 List<byte[]> keys = new ArrayList<>(); 141 for (int i = 0; i < numKeys; i++) { 142 int kIdx = i % starterKeys.length; 143 byte[] k = starterKeys[kIdx]; 144 byte[] cp = new byte[k.length + 1]; 145 System.arraycopy(k, 0, cp, 0, k.length); 146 cp[k.length] = new Integer(i % 256).byteValue(); 147 keys.add(cp); 148 } 149 150 // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which 151 // should work) 152 // c) keys are not in sorted order (within a region), to ensure that the 153 // sorting code and index mapping doesn't break the functionality 154 for (int i = 0; i < 100; i++) { 155 int kIdx = i % starterKeys.length; 156 byte[] k = starterKeys[kIdx]; 157 byte[] cp = new byte[k.length + 1]; 158 System.arraycopy(k, 0, cp, 0, k.length); 159 cp[k.length] = new Integer(i % 256).byteValue(); 160 keys.add(cp); 161 } 162 return keys.toArray(new byte[][] { new byte[] {} }); 163 } 164 165 @Test 166 public void testBatchWithGet() throws Exception { 167 LOG.info("test=testBatchWithGet"); 168 Table table = UTIL.getConnection().getTable(TEST_TABLE); 169 170 // load test data 171 List<Put> puts = constructPutRequests(); 172 table.batch(puts, null); 173 174 // create a list of gets and run it 175 List<Row> gets = new ArrayList<>(); 176 for (byte[] k : KEYS) { 177 Get get = new Get(k); 178 get.addColumn(BYTES_FAMILY, QUALIFIER); 179 gets.add(get); 180 } 181 Result[] multiRes = new Result[gets.size()]; 182 table.batch(gets, multiRes); 183 184 // Same gets using individual call API 185 List<Result> singleRes = new ArrayList<>(); 186 for (Row get : gets) { 187 singleRes.add(table.get((Get) get)); 188 } 189 // Compare results 190 Assert.assertEquals(singleRes.size(), multiRes.length); 191 for (int i = 0; i < singleRes.size(); i++) { 192 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); 193 Cell[] singleKvs = singleRes.get(i).rawCells(); 194 Cell[] multiKvs = multiRes[i].rawCells(); 195 for (int j = 0; j < singleKvs.length; j++) { 196 Assert.assertEquals(singleKvs[j], multiKvs[j]); 197 Assert.assertEquals(0, 198 Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]), CellUtil.cloneValue(multiKvs[j]))); 199 } 200 } 201 table.close(); 202 } 203 204 @Test 205 public void testBadFam() throws Exception { 206 LOG.info("test=testBadFam"); 207 Table table = UTIL.getConnection().getTable(TEST_TABLE); 208 209 List<Row> actions = new ArrayList<>(); 210 Put p = new Put(Bytes.toBytes("row1")); 211 p.addColumn(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value")); 212 actions.add(p); 213 p = new Put(Bytes.toBytes("row2")); 214 p.addColumn(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value")); 215 actions.add(p); 216 217 // row1 and row2 should be in the same region. 218 219 Object[] r = new Object[actions.size()]; 220 try { 221 table.batch(actions, r); 222 fail(); 223 } catch (RetriesExhaustedException ex) { 224 // expected 225 } 226 assertEquals(2, r.length); 227 assertTrue(r[0] instanceof Throwable); 228 assertTrue(r[1] instanceof Result); 229 table.close(); 230 } 231 232 @Test 233 public void testFlushCommitsNoAbort() throws Exception { 234 LOG.info("test=testFlushCommitsNoAbort"); 235 doTestFlushCommits(false); 236 } 237 238 /** 239 * Only run one Multi test with a forced RegionServer abort. Otherwise, the unit tests will take 240 * an unnecessarily long time to run. 241 */ 242 @Test 243 public void testFlushCommitsWithAbort() throws Exception { 244 LOG.info("test=testFlushCommitsWithAbort"); 245 doTestFlushCommits(true); 246 } 247 248 /** 249 * Set table auto flush to false and test flushing commits 250 * @param doAbort true if abort one regionserver in the testing 251 */ 252 private void doTestFlushCommits(boolean doAbort) throws Exception { 253 // Load the data 254 LOG.info("get new table"); 255 Table table = UTIL.getConnection().getTable(TEST_TABLE); 256 257 LOG.info("constructPutRequests"); 258 List<Put> puts = constructPutRequests(); 259 table.put(puts); 260 LOG.info("puts"); 261 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 262 assert liveRScount > 0; 263 JVMClusterUtil.RegionServerThread liveRS = 264 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 265 if (doAbort) { 266 liveRS.getRegionServer().abort("Aborting for tests", new Exception("doTestFlushCommits")); 267 // If we wait for no regions being online after we abort the server, we 268 // could ensure the master has re-assigned the regions on killed server 269 // after writing successfully. It means the server we aborted is dead 270 // and detected by matser 271 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) { 272 Thread.sleep(100); 273 } 274 // try putting more keys after the abort. same key/qual... just validating 275 // no exceptions thrown 276 puts = constructPutRequests(); 277 table.put(puts); 278 } 279 280 LOG.info("validating loaded data"); 281 validateLoadedData(table); 282 283 // Validate server and region count 284 List<JVMClusterUtil.RegionServerThread> liveRSs = 285 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); 286 int count = 0; 287 for (JVMClusterUtil.RegionServerThread t : liveRSs) { 288 count++; 289 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer()); 290 } 291 LOG.info("Count=" + count); 292 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, 293 (doAbort ? (liveRScount - 1) : liveRScount), count); 294 if (doAbort) { 295 UTIL.getMiniHBaseCluster().waitOnRegionServer(0); 296 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() { 297 @Override 298 public boolean evaluate() throws Exception { 299 // We disable regions on master so the count should be liveRScount - 1 300 return UTIL.getMiniHBaseCluster().getMaster().getClusterMetrics().getLiveServerMetrics() 301 .size() == liveRScount - 1; 302 } 303 }); 304 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); 305 } 306 307 table.close(); 308 LOG.info("done"); 309 } 310 311 @Test 312 public void testBatchWithPut() throws Exception { 313 LOG.info("test=testBatchWithPut"); 314 Table table = CONNECTION.getTable(TEST_TABLE); 315 // put multiple rows using a batch 316 List<Put> puts = constructPutRequests(); 317 318 Object[] results = new Object[puts.size()]; 319 table.batch(puts, results); 320 validateSizeAndEmpty(results, KEYS.length); 321 322 if (true) { 323 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size(); 324 assert liveRScount > 0; 325 JVMClusterUtil.RegionServerThread liveRS = 326 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0); 327 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut")); 328 puts = constructPutRequests(); 329 try { 330 results = new Object[puts.size()]; 331 table.batch(puts, results); 332 } catch (RetriesExhaustedWithDetailsException ree) { 333 LOG.info(ree.getExhaustiveDescription()); 334 table.close(); 335 throw ree; 336 } 337 validateSizeAndEmpty(results, KEYS.length); 338 } 339 340 validateLoadedData(table); 341 table.close(); 342 } 343 344 @Test 345 public void testBatchWithDelete() throws Exception { 346 LOG.info("test=testBatchWithDelete"); 347 Table table = UTIL.getConnection().getTable(TEST_TABLE); 348 349 // Load some data 350 List<Put> puts = constructPutRequests(); 351 Object[] results = new Object[puts.size()]; 352 table.batch(puts, results); 353 validateSizeAndEmpty(results, KEYS.length); 354 355 // Deletes 356 List<Row> deletes = new ArrayList<>(); 357 for (int i = 0; i < KEYS.length; i++) { 358 Delete delete = new Delete(KEYS[i]); 359 delete.addFamily(BYTES_FAMILY); 360 deletes.add(delete); 361 } 362 results = new Object[deletes.size()]; 363 table.batch(deletes, results); 364 validateSizeAndEmpty(results, KEYS.length); 365 366 // Get to make sure ... 367 for (byte[] k : KEYS) { 368 Get get = new Get(k); 369 get.addColumn(BYTES_FAMILY, QUALIFIER); 370 Assert.assertFalse(table.exists(get)); 371 } 372 table.close(); 373 } 374 375 @Test 376 public void testHTableDeleteWithList() throws Exception { 377 LOG.info("test=testHTableDeleteWithList"); 378 Table table = UTIL.getConnection().getTable(TEST_TABLE); 379 380 // Load some data 381 List<Put> puts = constructPutRequests(); 382 Object[] results = new Object[puts.size()]; 383 table.batch(puts, results); 384 validateSizeAndEmpty(results, KEYS.length); 385 386 // Deletes 387 ArrayList<Delete> deletes = new ArrayList<>(); 388 for (int i = 0; i < KEYS.length; i++) { 389 Delete delete = new Delete(KEYS[i]); 390 delete.addFamily(BYTES_FAMILY); 391 deletes.add(delete); 392 } 393 table.delete(deletes); 394 395 // Get to make sure ... 396 for (byte[] k : KEYS) { 397 Get get = new Get(k); 398 get.addColumn(BYTES_FAMILY, QUALIFIER); 399 Assert.assertFalse(table.exists(get)); 400 } 401 table.close(); 402 } 403 404 @Test 405 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { 406 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); 407 Table table = UTIL.getConnection().getTable(TEST_TABLE); 408 409 List<Row> puts = new ArrayList<>(); 410 for (int i = 0; i < 100; i++) { 411 Put put = new Put(ONE_ROW); 412 byte[] qual = Bytes.toBytes("column" + i); 413 put.addColumn(BYTES_FAMILY, qual, VALUE); 414 puts.add(put); 415 } 416 Object[] results = new Object[puts.size()]; 417 table.batch(puts, results); 418 419 // validate 420 validateSizeAndEmpty(results, 100); 421 422 // get the data back and validate that it is correct 423 List<Row> gets = new ArrayList<>(); 424 for (int i = 0; i < 100; i++) { 425 Get get = new Get(ONE_ROW); 426 byte[] qual = Bytes.toBytes("column" + i); 427 get.addColumn(BYTES_FAMILY, qual); 428 gets.add(get); 429 } 430 431 Object[] multiRes = new Object[gets.size()]; 432 table.batch(gets, multiRes); 433 434 int idx = 0; 435 for (Object r : multiRes) { 436 byte[] qual = Bytes.toBytes("column" + idx); 437 validateResult(r, qual, VALUE); 438 idx++; 439 } 440 table.close(); 441 } 442 443 @Test 444 public void testBatchWithIncrementAndAppend() throws Exception { 445 LOG.info("test=testBatchWithIncrementAndAppend"); 446 final byte[] QUAL1 = Bytes.toBytes("qual1"); 447 final byte[] QUAL2 = Bytes.toBytes("qual2"); 448 final byte[] QUAL3 = Bytes.toBytes("qual3"); 449 final byte[] QUAL4 = Bytes.toBytes("qual4"); 450 Table table = UTIL.getConnection().getTable(TEST_TABLE); 451 Delete d = new Delete(ONE_ROW); 452 table.delete(d); 453 Put put = new Put(ONE_ROW); 454 put.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc")); 455 put.addColumn(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L)); 456 table.put(put); 457 458 Increment inc = new Increment(ONE_ROW); 459 inc.addColumn(BYTES_FAMILY, QUAL2, 1); 460 inc.addColumn(BYTES_FAMILY, QUAL3, 1); 461 462 Append a = new Append(ONE_ROW); 463 a.addColumn(BYTES_FAMILY, QUAL1, Bytes.toBytes("def")); 464 a.addColumn(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz")); 465 List<Row> actions = new ArrayList<>(); 466 actions.add(inc); 467 actions.add(a); 468 469 Object[] multiRes = new Object[actions.size()]; 470 table.batch(actions, multiRes); 471 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef")); 472 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz")); 473 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L)); 474 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L)); 475 table.close(); 476 } 477 478 @Test 479 public void testBatchWithMixedActions() throws Exception { 480 LOG.info("test=testBatchWithMixedActions"); 481 Table table = UTIL.getConnection().getTable(TEST_TABLE); 482 483 // Load some data to start 484 List<Put> puts = constructPutRequests(); 485 Object[] results = new Object[puts.size()]; 486 table.batch(puts, results); 487 validateSizeAndEmpty(results, KEYS.length); 488 489 // Batch: get, get, put(new col), delete, get, get of put, get of deleted, 490 // put 491 List<Row> actions = new ArrayList<>(); 492 493 byte[] qual2 = Bytes.toBytes("qual2"); 494 byte[] val2 = Bytes.toBytes("putvalue2"); 495 496 // 0 get 497 Get get = new Get(KEYS[10]); 498 get.addColumn(BYTES_FAMILY, QUALIFIER); 499 actions.add(get); 500 501 // 1 get 502 get = new Get(KEYS[11]); 503 get.addColumn(BYTES_FAMILY, QUALIFIER); 504 actions.add(get); 505 506 // 2 put of new column 507 Put put = new Put(KEYS[10]); 508 put.addColumn(BYTES_FAMILY, qual2, val2); 509 actions.add(put); 510 511 // 3 delete 512 Delete delete = new Delete(KEYS[20]); 513 delete.addFamily(BYTES_FAMILY); 514 actions.add(delete); 515 516 // 4 get 517 get = new Get(KEYS[30]); 518 get.addColumn(BYTES_FAMILY, QUALIFIER); 519 actions.add(get); 520 521 // There used to be a 'get' of a previous put here, but removed 522 // since this API really cannot guarantee order in terms of mixed 523 // get/puts. 524 525 // 5 put of new column 526 put = new Put(KEYS[40]); 527 put.addColumn(BYTES_FAMILY, qual2, val2); 528 actions.add(put); 529 530 // 6 RowMutations 531 RowMutations rm = new RowMutations(KEYS[50]); 532 put = new Put(KEYS[50]); 533 put.addColumn(BYTES_FAMILY, qual2, val2); 534 rm.add((Mutation) put); 535 byte[] qual3 = Bytes.toBytes("qual3"); 536 byte[] val3 = Bytes.toBytes("putvalue3"); 537 put = new Put(KEYS[50]); 538 put.addColumn(BYTES_FAMILY, qual3, val3); 539 rm.add((Mutation) put); 540 actions.add(rm); 541 542 // 7 Add another Get to the mixed sequence after RowMutations 543 get = new Get(KEYS[10]); 544 get.addColumn(BYTES_FAMILY, QUALIFIER); 545 actions.add(get); 546 547 results = new Object[actions.size()]; 548 table.batch(actions, results); 549 550 // Validation 551 552 validateResult(results[0]); 553 validateResult(results[1]); 554 validateEmpty(results[3]); 555 validateResult(results[4]); 556 validateEmpty(results[5]); 557 validateEmpty(results[6]); 558 validateResult(results[7]); 559 560 // validate last put, externally from the batch 561 get = new Get(KEYS[40]); 562 get.addColumn(BYTES_FAMILY, qual2); 563 Result r = table.get(get); 564 validateResult(r, qual2, val2); 565 566 // validate last RowMutations, externally from the batch 567 get = new Get(KEYS[50]); 568 get.addColumn(BYTES_FAMILY, qual2); 569 r = table.get(get); 570 validateResult(r, qual2, val2); 571 572 get = new Get(KEYS[50]); 573 get.addColumn(BYTES_FAMILY, qual3); 574 r = table.get(get); 575 validateResult(r, qual3, val3); 576 577 table.close(); 578 } 579 580 // // Helper methods //// 581 582 private void validateResult(Object r) { 583 validateResult(r, QUALIFIER, VALUE); 584 } 585 586 private void validateResult(Object r1, byte[] qual, byte[] val) { 587 Result r = (Result) r1; 588 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual)); 589 byte[] value = r.getValue(BYTES_FAMILY, qual); 590 if (0 != Bytes.compareTo(val, value)) { 591 fail("Expected [" + Bytes.toStringBinary(val) + "] but got [" + Bytes.toStringBinary(value) 592 + "]"); 593 } 594 } 595 596 private List<Put> constructPutRequests() { 597 List<Put> puts = new ArrayList<>(); 598 for (byte[] k : KEYS) { 599 Put put = new Put(k); 600 put.addColumn(BYTES_FAMILY, QUALIFIER, VALUE); 601 puts.add(put); 602 } 603 return puts; 604 } 605 606 private void validateLoadedData(Table table) throws IOException { 607 // get the data back and validate that it is correct 608 LOG.info("Validating data on " + table); 609 List<Get> gets = new ArrayList<>(); 610 for (byte[] k : KEYS) { 611 Get get = new Get(k); 612 get.addColumn(BYTES_FAMILY, QUALIFIER); 613 gets.add(get); 614 } 615 int retryNum = 10; 616 Result[] results = null; 617 do { 618 results = table.get(gets); 619 boolean finished = true; 620 for (Result result : results) { 621 if (result.isEmpty()) { 622 finished = false; 623 break; 624 } 625 } 626 if (finished) { 627 break; 628 } 629 try { 630 Thread.sleep(10); 631 } catch (InterruptedException e) { 632 } 633 retryNum--; 634 } while (retryNum > 0); 635 636 if (retryNum == 0) { 637 fail("Timeout for validate data"); 638 } else { 639 if (results != null) { 640 for (Result r : results) { 641 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); 642 Assert.assertEquals(0, Bytes.compareTo(VALUE, r.getValue(BYTES_FAMILY, QUALIFIER))); 643 } 644 LOG.info("Validating data on " + table + " successfully!"); 645 } 646 } 647 } 648 649 private void validateEmpty(Object r1) { 650 Result result = (Result) r1; 651 Assert.assertTrue(result != null); 652 Assert.assertTrue(result.isEmpty()); 653 } 654 655 private void validateSizeAndEmpty(Object[] results, int expectedSize) { 656 // Validate got back the same number of Result objects, all empty 657 Assert.assertEquals(expectedSize, results.length); 658 for (Object result : results) { 659 validateEmpty(result); 660 } 661 } 662 663 public static class MyMasterObserver implements MasterObserver, MasterCoprocessor { 664 private static final AtomicInteger postBalanceCount = new AtomicInteger(0); 665 private static final AtomicBoolean start = new AtomicBoolean(false); 666 667 @Override 668 public void start(CoprocessorEnvironment env) throws IOException { 669 start.set(true); 670 } 671 672 @Override 673 public Optional<MasterObserver> getMasterObserver() { 674 return Optional.of(this); 675 } 676 677 @Override 678 public void postBalance(final ObserverContext<MasterCoprocessorEnvironment> ctx, 679 BalanceRequest request, List<RegionPlan> plans) throws IOException { 680 if (!plans.isEmpty()) { 681 postBalanceCount.incrementAndGet(); 682 } 683 } 684 } 685}