001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.List; 031import java.util.Optional; 032import java.util.Random; 033import java.util.concurrent.CountDownLatch; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.CellUtil; 043import org.apache.hadoop.hbase.Coprocessor; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.RegionMetrics; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 052import org.apache.hadoop.hbase.coprocessor.ObserverContext; 053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 055import org.apache.hadoop.hbase.coprocessor.RegionObserver; 056import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 057import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 058import org.apache.hadoop.hbase.ipc.RpcClient; 059import org.apache.hadoop.hbase.ipc.RpcClientFactory; 060import org.apache.hadoop.hbase.ipc.ServerRpcController; 061import org.apache.hadoop.hbase.regionserver.HRegion; 062import org.apache.hadoop.hbase.regionserver.HRegionServer; 063import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 064import org.apache.hadoop.hbase.regionserver.RegionScanner; 065import org.apache.hadoop.hbase.testclassification.ClientTests; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 069import org.junit.After; 070import org.junit.AfterClass; 071import org.junit.Assert; 072import org.junit.Before; 073import org.junit.BeforeClass; 074import org.junit.ClassRule; 075import org.junit.Rule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.junit.rules.TestName; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos; 085 086@Category({ LargeTests.class, ClientTests.class }) 087public class TestFromClientSide3 { 088 089 @ClassRule 090 public static final HBaseClassTestRule CLASS_RULE = 091 HBaseClassTestRule.forClass(TestFromClientSide3.class); 092 093 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class); 094 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 095 private static final int WAITTABLE_MILLIS = 10000; 096 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 097 private static int SLAVES = 3; 098 private static final byte[] ROW = Bytes.toBytes("testRow"); 099 private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); 100 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 101 private static final byte[] VALUE = Bytes.toBytes("testValue"); 102 private static final byte[] COL_QUAL = Bytes.toBytes("f1"); 103 private static final byte[] VAL_BYTES = Bytes.toBytes("v1"); 104 private static final byte[] ROW_BYTES = Bytes.toBytes("r1"); 105 106 @Rule 107 public TestName name = new TestName(); 108 private TableName tableName; 109 110 /** 111 * @throws java.lang.Exception 112 */ 113 @BeforeClass 114 public static void setUpBeforeClass() throws Exception { 115 TEST_UTIL.startMiniCluster(SLAVES); 116 } 117 118 /** 119 * @throws java.lang.Exception 120 */ 121 @AfterClass 122 public static void tearDownAfterClass() throws Exception { 123 TEST_UTIL.shutdownMiniCluster(); 124 } 125 126 @Before 127 public void setUp() throws Exception { 128 tableName = TableName.valueOf(name.getMethodName()); 129 } 130 131 @After 132 public void tearDown() throws Exception { 133 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) { 134 LOG.info("Tear down, remove table=" + htd.getTableName()); 135 TEST_UTIL.deleteTable(htd.getTableName()); 136 } 137 } 138 139 private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts) throws Exception { 140 Put put = new Put(row); 141 Random rand = ThreadLocalRandom.current(); 142 for (int i = 0; i < nPuts; i++) { 143 byte[] qualifier = Bytes.toBytes(rand.nextInt()); 144 byte[] value = Bytes.toBytes(rand.nextInt()); 145 put.addColumn(family, qualifier, value); 146 } 147 table.put(put); 148 } 149 150 private void performMultiplePutAndFlush(Admin admin, Table table, byte[] row, byte[] family, 151 int nFlushes, int nPuts) throws Exception { 152 for (int i = 0; i < nFlushes; i++) { 153 randomCFPuts(table, row, family, nPuts); 154 admin.flush(table.getName()); 155 } 156 } 157 158 private static List<Cell> toList(ResultScanner scanner) { 159 try { 160 List<Cell> cells = new ArrayList<>(); 161 for (Result r : scanner) { 162 cells.addAll(r.listCells()); 163 } 164 return cells; 165 } finally { 166 scanner.close(); 167 } 168 } 169 170 @Test 171 public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException { 172 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 173 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 174 byte[] row = Bytes.toBytes("SpecifiedRow"); 175 byte[] value0 = Bytes.toBytes("value_0"); 176 byte[] value1 = Bytes.toBytes("value_1"); 177 Put put = new Put(row); 178 put.addColumn(FAMILY, QUALIFIER, VALUE); 179 table.put(put); 180 Delete d = new Delete(row); 181 table.delete(d); 182 put = new Put(row); 183 put.addColumn(FAMILY, null, value0); 184 table.put(put); 185 put = new Put(row); 186 put.addColumn(FAMILY, null, value1); 187 table.put(put); 188 List<Cell> cells = toList(table.getScanner(new Scan())); 189 assertEquals(1, cells.size()); 190 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 191 192 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 193 assertEquals(1, cells.size()); 194 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 195 196 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 197 assertEquals(0, cells.size()); 198 199 TEST_UTIL.getAdmin().flush(tableName); 200 cells = toList(table.getScanner(new Scan())); 201 assertEquals(1, cells.size()); 202 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 203 204 cells = toList(table.getScanner(new Scan().addFamily(FAMILY))); 205 assertEquals(1, cells.size()); 206 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0)))); 207 208 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER))); 209 assertEquals(0, cells.size()); 210 } 211 } 212 213 @Test 214 public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException { 215 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 216 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 217 byte[] row = Bytes.toBytes("SpecifiedRow"); 218 byte[] qual0 = Bytes.toBytes("qual0"); 219 byte[] qual1 = Bytes.toBytes("qual1"); 220 long now = EnvironmentEdgeManager.currentTime(); 221 Delete d = new Delete(row, now); 222 table.delete(d); 223 224 Put put = new Put(row); 225 put.addColumn(FAMILY, null, now + 1, VALUE); 226 table.put(put); 227 228 put = new Put(row); 229 put.addColumn(FAMILY, qual1, now + 2, qual1); 230 table.put(put); 231 232 put = new Put(row); 233 put.addColumn(FAMILY, qual0, now + 3, qual0); 234 table.put(put); 235 236 Result r = table.get(new Get(row)); 237 assertEquals(r.toString(), 3, r.size()); 238 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 239 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 240 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 241 242 TEST_UTIL.getAdmin().flush(tableName); 243 r = table.get(new Get(row)); 244 assertEquals(3, r.size()); 245 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0]))); 246 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1]))); 247 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2]))); 248 } 249 } 250 251 private int getStoreFileCount(Admin admin, ServerName serverName, RegionInfo region) 252 throws IOException { 253 for (RegionMetrics metrics : admin.getRegionMetrics(serverName, region.getTable())) { 254 if (Bytes.equals(region.getRegionName(), metrics.getRegionName())) { 255 return metrics.getStoreFileCount(); 256 } 257 } 258 return 0; 259 } 260 261 // override the config settings at the CF level and ensure priority 262 @Test 263 public void testAdvancedConfigOverride() throws Exception { 264 /* 265 * Overall idea: (1) create 3 store files and issue a compaction. config's compaction.min == 3, 266 * so should work. (2) Increase the compaction.min toggle in the HTD to 5 and modify table. If 267 * we use the HTD value instead of the default config value, adding 3 files and issuing a 268 * compaction SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2 and modify 269 * table. The CF schema should override the Table schema and now cause a minor compaction. 270 */ 271 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3); 272 273 final TableName tableName = TableName.valueOf(name.getMethodName()); 274 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 10)) { 275 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 276 Admin admin = TEST_UTIL.getAdmin(); 277 278 // Create 3 store files. 279 byte[] row = Bytes.toBytes(ThreadLocalRandom.current().nextInt()); 280 performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100); 281 282 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 283 // Verify we have multiple store files. 284 HRegionLocation loc = locator.getRegionLocation(row, true); 285 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1); 286 287 // Issue a compaction request 288 admin.compact(tableName); 289 290 // poll wait for the compactions to happen 291 for (int i = 0; i < 10 * 1000 / 40; ++i) { 292 // The number of store files after compaction should be lesser. 293 loc = locator.getRegionLocation(row, true); 294 if (!loc.getRegion().isOffline()) { 295 if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1) { 296 break; 297 } 298 } 299 Thread.sleep(40); 300 } 301 // verify the compactions took place and that we didn't just time out 302 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1); 303 304 // change the compaction.min config option for this table to 5 305 LOG.info("hbase.hstore.compaction.min should now be 5"); 306 TableDescriptor htd = TableDescriptorBuilder.newBuilder(table.getDescriptor()) 307 .setValue("hbase.hstore.compaction.min", String.valueOf(5)).build(); 308 admin.modifyTable(htd); 309 LOG.info("alter status finished"); 310 311 // Create 3 more store files. 312 performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 10); 313 314 // Issue a compaction request 315 admin.compact(tableName); 316 317 // This time, the compaction request should not happen 318 Thread.sleep(10 * 1000); 319 loc = locator.getRegionLocation(row, true); 320 int sfCount = getStoreFileCount(admin, loc.getServerName(), loc.getRegion()); 321 assertTrue(sfCount > 1); 322 323 // change an individual CF's config option to 2 & online schema update 324 LOG.info("hbase.hstore.compaction.min should now be 2"); 325 htd = TableDescriptorBuilder.newBuilder(htd) 326 .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY)) 327 .setValue("hbase.hstore.compaction.min", String.valueOf(2)).build()) 328 .build(); 329 admin.modifyTable(htd); 330 LOG.info("alter status finished"); 331 332 // Issue a compaction request 333 admin.compact(tableName); 334 335 // poll wait for the compactions to happen 336 for (int i = 0; i < 10 * 1000 / 40; ++i) { 337 loc = locator.getRegionLocation(row, true); 338 try { 339 if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount) { 340 break; 341 } 342 } catch (Exception e) { 343 LOG.debug("Waiting for region to come online: " 344 + Bytes.toStringBinary(loc.getRegion().getRegionName())); 345 } 346 Thread.sleep(40); 347 } 348 349 // verify the compaction took place and that we didn't just time out 350 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount); 351 352 // Finally, ensure that we can remove a custom config value after we made it 353 LOG.info("Removing CF config value"); 354 LOG.info("hbase.hstore.compaction.min should now be 5"); 355 htd = TableDescriptorBuilder.newBuilder(htd) 356 .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY)) 357 .setValue("hbase.hstore.compaction.min", null).build()) 358 .build(); 359 admin.modifyTable(htd); 360 LOG.info("alter status finished"); 361 assertNull(table.getDescriptor().getColumnFamily(FAMILY) 362 .getValue(Bytes.toBytes("hbase.hstore.compaction.min"))); 363 } 364 } 365 } 366 367 @Test 368 public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException { 369 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 370 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 371 List actions = (List) new ArrayList(); 372 Object[] results = new Object[2]; 373 // create an empty Put 374 Put put1 = new Put(ROW); 375 actions.add(put1); 376 377 Put put2 = new Put(ANOTHERROW); 378 put2.addColumn(FAMILY, QUALIFIER, VALUE); 379 actions.add(put2); 380 381 table.batch(actions, results); 382 fail("Empty Put should have failed the batch call"); 383 } catch (IllegalArgumentException iae) { 384 } 385 } 386 387 // Test Table.batch with large amount of mutations against the same key. 388 // It used to trigger read lock's "Maximum lock count exceeded" Error. 389 @Test 390 public void testHTableWithLargeBatch() throws IOException, InterruptedException { 391 int sixtyFourK = 64 * 1024; 392 List actions = new ArrayList(); 393 Object[] results = new Object[(sixtyFourK + 1) * 2]; 394 395 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 396 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 397 398 for (int i = 0; i < sixtyFourK + 1; i++) { 399 Put put1 = new Put(ROW); 400 put1.addColumn(FAMILY, QUALIFIER, VALUE); 401 actions.add(put1); 402 403 Put put2 = new Put(ANOTHERROW); 404 put2.addColumn(FAMILY, QUALIFIER, VALUE); 405 actions.add(put2); 406 } 407 408 table.batch(actions, results); 409 } 410 } 411 412 @Test 413 public void testBatchWithRowMutation() throws Exception { 414 LOG.info("Starting testBatchWithRowMutation"); 415 byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") }; 416 417 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 418 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 419 420 RowMutations arm = RowMutations 421 .of(Collections.singletonList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE))); 422 Object[] batchResult = new Object[1]; 423 table.batch(Arrays.asList(arm), batchResult); 424 425 Get g = new Get(ROW); 426 Result r = table.get(g); 427 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0]))); 428 429 arm = RowMutations.of(Arrays.asList(new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE), 430 new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0]))); 431 table.batch(Arrays.asList(arm), batchResult); 432 r = table.get(g); 433 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1]))); 434 assertNull(r.getValue(FAMILY, QUALIFIERS[0])); 435 436 // Test that we get the correct remote exception for RowMutations from batch() 437 try { 438 arm = RowMutations.of(Collections.singletonList( 439 new Put(ROW).addColumn(new byte[] { 'b', 'o', 'g', 'u', 's' }, QUALIFIERS[0], VALUE))); 440 table.batch(Arrays.asList(arm), batchResult); 441 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException"); 442 } catch (RetriesExhaustedException e) { 443 String msg = e.getMessage(); 444 assertTrue(msg.contains("NoSuchColumnFamilyException")); 445 } 446 } 447 } 448 449 @Test 450 public void testBatchWithCheckAndMutate() throws Exception { 451 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 452 byte[] row1 = Bytes.toBytes("row1"); 453 byte[] row2 = Bytes.toBytes("row2"); 454 byte[] row3 = Bytes.toBytes("row3"); 455 byte[] row4 = Bytes.toBytes("row4"); 456 byte[] row5 = Bytes.toBytes("row5"); 457 byte[] row6 = Bytes.toBytes("row6"); 458 byte[] row7 = Bytes.toBytes("row7"); 459 460 table 461 .put(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 462 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 463 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 464 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 465 new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 466 new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)), 467 new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))); 468 469 CheckAndMutate checkAndMutate1 = 470 CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 471 .build(new RowMutations(row1) 472 .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) 473 .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) 474 .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) 475 .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); 476 Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); 477 RowMutations mutations = 478 new RowMutations(row3).add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) 479 .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 480 .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) 481 .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 482 CheckAndMutate checkAndMutate2 = 483 CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) 484 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); 485 Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); 486 CheckAndMutate checkAndMutate3 = 487 CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)) 488 .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1)); 489 CheckAndMutate checkAndMutate4 = 490 CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")) 491 .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))); 492 493 List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put, 494 checkAndMutate3, checkAndMutate4); 495 Object[] results = new Object[actions.size()]; 496 table.batch(actions, results); 497 498 CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0]; 499 assertTrue(checkAndMutateResult.isSuccess()); 500 assertEquals(3L, 501 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); 502 assertEquals("d", 503 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); 504 505 assertEquals("b", Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B")))); 506 507 Result result = (Result) results[2]; 508 assertTrue(result.getExists()); 509 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 510 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 511 512 checkAndMutateResult = (CheckAndMutateResult) results[3]; 513 assertFalse(checkAndMutateResult.isSuccess()); 514 assertNull(checkAndMutateResult.getResult()); 515 516 assertTrue(((Result) results[4]).isEmpty()); 517 518 checkAndMutateResult = (CheckAndMutateResult) results[5]; 519 assertTrue(checkAndMutateResult.isSuccess()); 520 assertEquals(11, 521 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F")))); 522 523 checkAndMutateResult = (CheckAndMutateResult) results[6]; 524 assertTrue(checkAndMutateResult.isSuccess()); 525 assertEquals("gg", 526 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G")))); 527 528 result = table.get(new Get(row1)); 529 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 530 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 531 assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 532 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 533 534 result = table.get(new Get(row3)); 535 assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); 536 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 537 assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 538 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 539 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 540 541 result = table.get(new Get(row4)); 542 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 543 544 result = table.get(new Get(row5)); 545 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); 546 547 result = table.get(new Get(row6)); 548 assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F")))); 549 550 result = table.get(new Get(row7)); 551 assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G")))); 552 } 553 } 554 555 @Test 556 public void testHTableExistsMethodSingleRegionSingleGet() 557 throws IOException, InterruptedException { 558 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 559 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 560 561 // Test with a single region table. 562 Put put = new Put(ROW); 563 put.addColumn(FAMILY, QUALIFIER, VALUE); 564 565 Get get = new Get(ROW); 566 567 boolean exist = table.exists(get); 568 assertFalse(exist); 569 570 table.put(put); 571 572 exist = table.exists(get); 573 assertTrue(exist); 574 } 575 } 576 577 @Test 578 public void testHTableExistsMethodSingleRegionMultipleGets() 579 throws IOException, InterruptedException { 580 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 581 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 582 583 Put put = new Put(ROW); 584 put.addColumn(FAMILY, QUALIFIER, VALUE); 585 table.put(put); 586 587 List<Get> gets = new ArrayList<>(); 588 gets.add(new Get(ROW)); 589 gets.add(new Get(ANOTHERROW)); 590 591 boolean[] results = table.exists(gets); 592 assertTrue(results[0]); 593 assertFalse(results[1]); 594 } 595 } 596 597 @Test 598 public void testHTableExistsBeforeGet() throws IOException, InterruptedException { 599 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 600 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 601 602 Put put = new Put(ROW); 603 put.addColumn(FAMILY, QUALIFIER, VALUE); 604 table.put(put); 605 606 Get get = new Get(ROW); 607 608 boolean exist = table.exists(get); 609 assertEquals(true, exist); 610 611 Result result = table.get(get); 612 assertEquals(false, result.isEmpty()); 613 assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER))); 614 } 615 } 616 617 @Test 618 public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException { 619 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 620 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 621 622 final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2")); 623 Put put = new Put(ROW); 624 put.addColumn(FAMILY, QUALIFIER, VALUE); 625 table.put(put); 626 put = new Put(ROW2); 627 put.addColumn(FAMILY, QUALIFIER, VALUE); 628 table.put(put); 629 630 Get get = new Get(ROW); 631 Get get2 = new Get(ROW2); 632 ArrayList<Get> getList = new ArrayList(2); 633 getList.add(get); 634 getList.add(get2); 635 636 boolean[] exists = table.exists(getList); 637 assertEquals(true, exists[0]); 638 assertEquals(true, exists[1]); 639 640 Result[] result = table.get(getList); 641 assertEquals(false, result[0].isEmpty()); 642 assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER))); 643 assertEquals(false, result[1].isEmpty()); 644 assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER))); 645 } 646 } 647 648 @Test 649 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception { 650 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 651 new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 652 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 653 654 Put put = new Put(ROW); 655 put.addColumn(FAMILY, QUALIFIER, VALUE); 656 657 Get get = new Get(ROW); 658 659 boolean exist = table.exists(get); 660 assertFalse(exist); 661 662 table.put(put); 663 664 exist = table.exists(get); 665 assertTrue(exist); 666 } 667 } 668 669 @Test 670 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception { 671 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 672 new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) { 673 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 674 675 Put put = new Put(ROW); 676 put.addColumn(FAMILY, QUALIFIER, VALUE); 677 table.put(put); 678 679 List<Get> gets = new ArrayList<>(); 680 gets.add(new Get(ANOTHERROW)); 681 gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 }))); 682 gets.add(new Get(ROW)); 683 gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 }))); 684 685 LOG.info("Calling exists"); 686 boolean[] results = table.exists(gets); 687 assertFalse(results[0]); 688 assertFalse(results[1]); 689 assertTrue(results[2]); 690 assertFalse(results[3]); 691 692 // Test with the first region. 693 put = new Put(new byte[] { 0x00 }); 694 put.addColumn(FAMILY, QUALIFIER, VALUE); 695 table.put(put); 696 697 gets = new ArrayList<>(); 698 gets.add(new Get(new byte[] { 0x00 })); 699 gets.add(new Get(new byte[] { 0x00, 0x00 })); 700 results = table.exists(gets); 701 assertTrue(results[0]); 702 assertFalse(results[1]); 703 704 // Test with the last region 705 put = new Put(new byte[] { (byte) 0xff, (byte) 0xff }); 706 put.addColumn(FAMILY, QUALIFIER, VALUE); 707 table.put(put); 708 709 gets = new ArrayList<>(); 710 gets.add(new Get(new byte[] { (byte) 0xff })); 711 gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff })); 712 gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff })); 713 results = table.exists(gets); 714 assertFalse(results[0]); 715 assertTrue(results[1]); 716 assertFalse(results[2]); 717 } 718 } 719 720 @Test 721 public void testGetEmptyRow() throws Exception { 722 // Create a table and put in 1 row 723 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 724 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 725 726 Put put = new Put(ROW_BYTES); 727 put.addColumn(FAMILY, COL_QUAL, VAL_BYTES); 728 table.put(put); 729 730 // Try getting the row with an empty row key 731 Result res = null; 732 try { 733 res = table.get(new Get(new byte[0])); 734 fail(); 735 } catch (IllegalArgumentException e) { 736 // Expected. 737 } 738 assertTrue(res == null); 739 res = table.get(new Get(Bytes.toBytes("r1-not-exist"))); 740 assertTrue(res.isEmpty() == true); 741 res = table.get(new Get(ROW_BYTES)); 742 assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); 743 } 744 } 745 746 @Test 747 public void testConnectionDefaultUsesCodec() throws Exception { 748 try ( 749 RpcClient client = RpcClientFactory.createClient(TEST_UTIL.getConfiguration(), "cluster")) { 750 assertTrue(client.hasCellBlockSupport()); 751 } 752 } 753 754 @Test 755 public void testPutWithPreBatchMutate() throws Exception { 756 testPreBatchMutate(tableName, () -> { 757 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 758 Put put = new Put(ROW); 759 put.addColumn(FAMILY, QUALIFIER, VALUE); 760 t.put(put); 761 } catch (IOException ex) { 762 throw new RuntimeException(ex); 763 } 764 }); 765 } 766 767 @Test 768 public void testRowMutationsWithPreBatchMutate() throws Exception { 769 testPreBatchMutate(tableName, () -> { 770 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 771 RowMutations rm = new RowMutations(ROW, 1); 772 Put put = new Put(ROW); 773 put.addColumn(FAMILY, QUALIFIER, VALUE); 774 rm.add(put); 775 t.mutateRow(rm); 776 } catch (IOException ex) { 777 throw new RuntimeException(ex); 778 } 779 }); 780 } 781 782 private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception { 783 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 784 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 785 .setCoprocessor(WaitingForScanObserver.class.getName()).build(); 786 TEST_UTIL.getAdmin().createTable(tableDescriptor); 787 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor 788 789 ExecutorService service = Executors.newFixedThreadPool(2); 790 service.execute(rn); 791 final List<Cell> cells = new ArrayList<>(); 792 service.execute(() -> { 793 try { 794 // waiting for update. 795 TimeUnit.SECONDS.sleep(3); 796 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { 797 Scan scan = new Scan(); 798 try (ResultScanner scanner = t.getScanner(scan)) { 799 for (Result r : scanner) { 800 cells.addAll(Arrays.asList(r.rawCells())); 801 } 802 } 803 } 804 } catch (IOException | InterruptedException ex) { 805 throw new RuntimeException(ex); 806 } 807 }); 808 service.shutdown(); 809 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 810 assertEquals("The write is blocking by RegionObserver#postBatchMutate" 811 + ", so the data is invisible to reader", 0, cells.size()); 812 TEST_UTIL.deleteTable(tableName); 813 } 814 815 @Test 816 public void testLockLeakWithDelta() throws Exception, Throwable { 817 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 818 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 819 .setCoprocessor(WaitingForMultiMutationsObserver.class.getName()) 820 .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build(); 821 TEST_UTIL.getAdmin().createTable(tableDescriptor); 822 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 823 824 // new a connection for lower retry number. 825 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 826 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 827 try (Connection con = ConnectionFactory.createConnection(copy)) { 828 HRegion region = (HRegion) find(tableName); 829 region.setTimeoutForWriteLock(10); 830 ExecutorService putService = Executors.newSingleThreadExecutor(); 831 putService.execute(() -> { 832 try (Table table = con.getTable(tableName)) { 833 Put put = new Put(ROW); 834 put.addColumn(FAMILY, QUALIFIER, VALUE); 835 // the put will be blocked by WaitingForMultiMutationsObserver. 836 table.put(put); 837 } catch (IOException ex) { 838 throw new RuntimeException(ex); 839 } 840 }); 841 ExecutorService appendService = Executors.newSingleThreadExecutor(); 842 appendService.execute(() -> { 843 Append append = new Append(ROW); 844 append.addColumn(FAMILY, QUALIFIER, VALUE); 845 try (Table table = con.getTable(tableName)) { 846 table.append(append); 847 fail("The APPEND should fail because the target lock is blocked by previous put"); 848 } catch (Exception ex) { 849 } 850 }); 851 appendService.shutdown(); 852 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 853 WaitingForMultiMutationsObserver observer = 854 find(tableName, WaitingForMultiMutationsObserver.class); 855 observer.latch.countDown(); 856 putService.shutdown(); 857 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 858 try (Table table = con.getTable(tableName)) { 859 Result r = table.get(new Get(ROW)); 860 assertFalse(r.isEmpty()); 861 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE)); 862 } 863 } 864 HRegion region = (HRegion) find(tableName); 865 int readLockCount = region.getReadLockCount(); 866 LOG.info("readLockCount:" + readLockCount); 867 assertEquals(0, readLockCount); 868 } 869 870 @Test 871 public void testMultiRowMutations() throws Exception, Throwable { 872 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 873 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 874 .setCoprocessor(MultiRowMutationEndpoint.class.getName()) 875 .setCoprocessor(WaitingForMultiMutationsObserver.class.getName()) 876 .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build(); 877 TEST_UTIL.getAdmin().createTable(tableDescriptor); 878 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 879 880 // new a connection for lower retry number. 881 Configuration copy = new Configuration(TEST_UTIL.getConfiguration()); 882 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 883 try (Connection con = ConnectionFactory.createConnection(copy)) { 884 byte[] row = Bytes.toBytes("ROW-0"); 885 byte[] rowLocked = Bytes.toBytes("ROW-1"); 886 byte[] value0 = Bytes.toBytes("VALUE-0"); 887 byte[] value1 = Bytes.toBytes("VALUE-1"); 888 byte[] value2 = Bytes.toBytes("VALUE-2"); 889 assertNoLocks(tableName); 890 ExecutorService putService = Executors.newSingleThreadExecutor(); 891 putService.execute(() -> { 892 try (Table table = con.getTable(tableName)) { 893 Put put0 = new Put(rowLocked); 894 put0.addColumn(FAMILY, QUALIFIER, value0); 895 // the put will be blocked by WaitingForMultiMutationsObserver. 896 table.put(put0); 897 } catch (IOException ex) { 898 throw new RuntimeException(ex); 899 } 900 }); 901 ExecutorService cpService = Executors.newSingleThreadExecutor(); 902 AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean(); 903 cpService.execute(() -> { 904 Put put1 = new Put(row); 905 Put put2 = new Put(rowLocked); 906 put1.addColumn(FAMILY, QUALIFIER, value1); 907 put2.addColumn(FAMILY, QUALIFIER, value2); 908 try (Table table = con.getTable(tableName)) { 909 MultiRowMutationProtos.MutateRowsRequest request = 910 MultiRowMutationProtos.MutateRowsRequest.newBuilder() 911 .addMutationRequest( 912 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1)) 913 .addMutationRequest( 914 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2)) 915 .build(); 916 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW, 917 (MultiRowMutationProtos.MultiRowMutationService exe) -> { 918 ServerRpcController controller = new ServerRpcController(); 919 CoprocessorRpcUtils.BlockingRpcCallback< 920 MultiRowMutationProtos.MutateRowsResponse> rpcCallback = 921 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 922 exe.mutateRows(controller, request, rpcCallback); 923 if ( 924 controller.failedOnException() 925 && !(controller.getFailedOn() instanceof UnknownProtocolException) 926 ) { 927 exceptionDuringMutateRows.set(true); 928 } 929 return rpcCallback.get(); 930 }); 931 } catch (Throwable ex) { 932 LOG.error("encountered " + ex); 933 } 934 }); 935 cpService.shutdown(); 936 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 937 WaitingForMultiMutationsObserver observer = 938 find(tableName, WaitingForMultiMutationsObserver.class); 939 observer.latch.countDown(); 940 putService.shutdown(); 941 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 942 try (Table table = con.getTable(tableName)) { 943 Get g0 = new Get(row); 944 Get g1 = new Get(rowLocked); 945 Result r0 = table.get(g0); 946 Result r1 = table.get(g1); 947 assertTrue(r0.isEmpty()); 948 assertFalse(r1.isEmpty()); 949 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0)); 950 } 951 assertNoLocks(tableName); 952 if (!exceptionDuringMutateRows.get()) { 953 fail("This cp should fail because the target lock is blocked by previous put"); 954 } 955 } 956 } 957 958 /** 959 * A test case for issue HBASE-17482 After combile seqid with mvcc readpoint, seqid/mvcc is 960 * acquired and stamped onto cells in the append thread, a countdown latch is used to ensure that 961 * happened before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698) make 962 * the seqid/mvcc acquirement in handler thread and stamping in append thread No countdown latch 963 * to assure cells in memstore are stamped with seqid/mvcc. If cells without mvcc(A.K.A mvcc=0) 964 * are put into memstore, then a scanner with a smaller readpoint can see these data, which 965 * disobey the multi version concurrency control rules. This test case is to reproduce this 966 * scenario. 967 */ 968 @Test 969 public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException { 970 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) { 971 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 972 // put two row first to init the scanner 973 Put put = new Put(Bytes.toBytes("0")); 974 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 975 table.put(put); 976 put = new Put(Bytes.toBytes("00")); 977 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0")); 978 table.put(put); 979 Scan scan = new Scan(); 980 scan.setTimeRange(0, Long.MAX_VALUE); 981 scan.setCaching(1); 982 ResultScanner scanner = table.getScanner(scan); 983 int rowNum = scanner.next() != null ? 1 : 0; 984 // the started scanner shouldn't see the rows put below 985 for (int i = 1; i < 1000; i++) { 986 put = new Put(Bytes.toBytes(String.valueOf(i))); 987 put.setDurability(Durability.ASYNC_WAL); 988 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i)); 989 table.put(put); 990 } 991 for (Result result : scanner) { 992 rowNum++; 993 } 994 // scanner should only see two rows 995 assertEquals(2, rowNum); 996 scanner = table.getScanner(scan); 997 rowNum = 0; 998 for (Result result : scanner) { 999 rowNum++; 1000 } 1001 // the new scanner should see all rows 1002 assertEquals(1001, rowNum); 1003 } 1004 } 1005 1006 @Test 1007 public void testPutThenGetWithMultipleThreads() throws Exception { 1008 final int THREAD_NUM = 20; 1009 final int ROUND_NUM = 10; 1010 for (int round = 0; round < ROUND_NUM; round++) { 1011 ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM); 1012 final AtomicInteger successCnt = new AtomicInteger(0); 1013 try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { 1014 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1015 1016 for (int i = 0; i < THREAD_NUM; i++) { 1017 final int index = i; 1018 Thread t = new Thread(new Runnable() { 1019 1020 @Override 1021 public void run() { 1022 final byte[] row = Bytes.toBytes("row-" + index); 1023 final byte[] value = Bytes.toBytes("v" + index); 1024 try { 1025 Put put = new Put(row); 1026 put.addColumn(FAMILY, QUALIFIER, value); 1027 ht.put(put); 1028 Get get = new Get(row); 1029 Result result = ht.get(get); 1030 byte[] returnedValue = result.getValue(FAMILY, QUALIFIER); 1031 if (Bytes.equals(value, returnedValue)) { 1032 successCnt.getAndIncrement(); 1033 } else { 1034 LOG.error("Should be equal but not, original value: " + Bytes.toString(value) 1035 + ", returned value: " 1036 + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); 1037 } 1038 } catch (Throwable e) { 1039 // do nothing 1040 } 1041 } 1042 }); 1043 threads.add(t); 1044 } 1045 for (Thread t : threads) { 1046 t.start(); 1047 } 1048 for (Thread t : threads) { 1049 t.join(); 1050 } 1051 assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get()); 1052 } 1053 TEST_UTIL.deleteTable(tableName); 1054 } 1055 } 1056 1057 private static void assertNoLocks(final TableName tableName) 1058 throws IOException, InterruptedException { 1059 HRegion region = (HRegion) find(tableName); 1060 assertEquals(0, region.getLockedRows().size()); 1061 } 1062 1063 private static HRegion find(final TableName tableName) throws IOException, InterruptedException { 1064 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); 1065 List<HRegion> regions = rs.getRegions(tableName); 1066 assertEquals(1, regions.size()); 1067 return regions.get(0); 1068 } 1069 1070 private static <T extends RegionObserver> T find(final TableName tableName, Class<T> clz) 1071 throws IOException, InterruptedException { 1072 HRegion region = find(tableName); 1073 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); 1074 assertTrue("The cp instance should be " + clz.getName() + ", current instance is " 1075 + cp.getClass().getName(), clz.isInstance(cp)); 1076 return clz.cast(cp); 1077 } 1078 1079 public static class WaitingForMultiMutationsObserver 1080 implements RegionCoprocessor, RegionObserver { 1081 final CountDownLatch latch = new CountDownLatch(1); 1082 1083 @Override 1084 public Optional<RegionObserver> getRegionObserver() { 1085 return Optional.of(this); 1086 } 1087 1088 @Override 1089 public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 1090 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1091 try { 1092 latch.await(); 1093 } catch (InterruptedException ex) { 1094 throw new IOException(ex); 1095 } 1096 } 1097 } 1098 1099 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver { 1100 private final CountDownLatch latch = new CountDownLatch(1); 1101 1102 @Override 1103 public Optional<RegionObserver> getRegionObserver() { 1104 return Optional.of(this); 1105 } 1106 1107 @Override 1108 public void postBatchMutate(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 1109 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1110 try { 1111 // waiting for scanner 1112 latch.await(); 1113 } catch (InterruptedException ex) { 1114 throw new IOException(ex); 1115 } 1116 } 1117 1118 @Override 1119 public RegionScanner postScannerOpen( 1120 final ObserverContext<? extends RegionCoprocessorEnvironment> e, final Scan scan, 1121 final RegionScanner s) throws IOException { 1122 latch.countDown(); 1123 return s; 1124 } 1125 } 1126 1127 static byte[] generateHugeValue(int size) { 1128 Random rand = ThreadLocalRandom.current(); 1129 byte[] value = new byte[size]; 1130 for (int i = 0; i < value.length; i++) { 1131 value[i] = (byte) rand.nextInt(256); 1132 } 1133 return value; 1134 } 1135 1136 @Test 1137 public void testScanWithBatchSizeReturnIncompleteCells() 1138 throws IOException, InterruptedException { 1139 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName) 1140 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build()) 1141 .build(); 1142 try (Table table = TEST_UTIL.createTable(hd, null)) { 1143 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS); 1144 1145 Put put = new Put(ROW); 1146 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024)); 1147 table.put(put); 1148 1149 put = new Put(ROW); 1150 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024)); 1151 table.put(put); 1152 1153 for (int i = 2; i < 5; i++) { 1154 for (int version = 0; version < 2; version++) { 1155 put = new Put(ROW); 1156 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024)); 1157 table.put(put); 1158 } 1159 } 1160 1161 Scan scan = new Scan(); 1162 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3) 1163 .setMaxResultSize(4 * 1024 * 1024); 1164 Result result; 1165 try (ResultScanner scanner = table.getScanner(scan)) { 1166 List<Result> list = new ArrayList<>(); 1167 /* 1168 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The 1169 * second scan rpc should return a result with 3 cells, because reach the batch limit = 3; 1170 * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the 1171 * moreResultsInRegion also would be false. Finally, the client should collect all the cells 1172 * into two result: 2+3 -> 3+2; 1173 */ 1174 while ((result = scanner.next()) != null) { 1175 list.add(result); 1176 } 1177 1178 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1179 Assert.assertEquals(2, list.size()); 1180 Assert.assertEquals(3, list.get(0).size()); 1181 Assert.assertEquals(2, list.get(1).size()); 1182 } 1183 1184 scan = new Scan(); 1185 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2) 1186 .setMaxResultSize(4 * 1024 * 1024); 1187 try (ResultScanner scanner = table.getScanner(scan)) { 1188 List<Result> list = new ArrayList<>(); 1189 while ((result = scanner.next()) != null) { 1190 list.add(result); 1191 } 1192 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum()); 1193 Assert.assertEquals(3, list.size()); 1194 Assert.assertEquals(2, list.get(0).size()); 1195 Assert.assertEquals(2, list.get(1).size()); 1196 Assert.assertEquals(1, list.get(2).size()); 1197 } 1198 } 1199 } 1200}