001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.List; 031import java.util.Objects; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.CompareOperator; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.MultithreadedTestUtil; 045import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; 046import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Append; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Delete; 052import org.apache.hadoop.hbase.client.Durability; 053import org.apache.hadoop.hbase.client.Get; 054import org.apache.hadoop.hbase.client.Increment; 055import org.apache.hadoop.hbase.client.IsolationLevel; 056import org.apache.hadoop.hbase.client.Mutation; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.RegionInfo; 059import org.apache.hadoop.hbase.client.RegionInfoBuilder; 060import org.apache.hadoop.hbase.client.Result; 061import org.apache.hadoop.hbase.client.RowMutations; 062import org.apache.hadoop.hbase.client.Scan; 063import org.apache.hadoop.hbase.client.TableDescriptor; 064import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 065import org.apache.hadoop.hbase.filter.BinaryComparator; 066import org.apache.hadoop.hbase.io.HeapSize; 067import org.apache.hadoop.hbase.io.hfile.BlockCache; 068import org.apache.hadoop.hbase.io.hfile.CacheConfig; 069import org.apache.hadoop.hbase.testclassification.LargeTests; 070import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.wal.WAL; 073import org.junit.After; 074import org.junit.Before; 075import org.junit.ClassRule; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.junit.rules.TestName; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083/** 084 * Testing of HRegion.incrementColumnValue, HRegion.increment, and HRegion.append 085 */ 086@Category({ VerySlowRegionServerTests.class, LargeTests.class }) // Starts 100 threads 087public class TestAtomicOperation { 088 089 @ClassRule 090 public static final HBaseClassTestRule CLASS_RULE = 091 HBaseClassTestRule.forClass(TestAtomicOperation.class); 092 093 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class); 094 @Rule 095 public TestName name = new TestName(); 096 097 HRegion region = null; 098 private HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 099 100 // Test names 101 static byte[] tableName; 102 static final byte[] qual1 = Bytes.toBytes("qual1"); 103 static final byte[] qual2 = Bytes.toBytes("qual2"); 104 static final byte[] qual3 = Bytes.toBytes("qual3"); 105 static final byte[] value1 = Bytes.toBytes("value1"); 106 static final byte[] value2 = Bytes.toBytes("value2"); 107 static final byte[] row = Bytes.toBytes("rowA"); 108 static final byte[] row2 = Bytes.toBytes("rowB"); 109 110 @Before 111 public void setup() { 112 tableName = Bytes.toBytes(name.getMethodName()); 113 } 114 115 @After 116 public void teardown() throws IOException { 117 if (region != null) { 118 CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig(); 119 region.close(); 120 WAL wal = region.getWAL(); 121 if (wal != null) { 122 wal.close(); 123 } 124 cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown); 125 region = null; 126 } 127 } 128 129 ////////////////////////////////////////////////////////////////////////////// 130 // New tests that doesn't spin up a mini cluster but rather just test the 131 // individual code pieces in the HRegion. 132 ////////////////////////////////////////////////////////////////////////////// 133 134 /** 135 * Test basic append operation. More tests in 136 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend() 137 */ 138 @Test 139 public void testAppend() throws IOException { 140 initHRegion(tableName, name.getMethodName(), fam1); 141 String v1 = 142 "Ultimate Answer to the Ultimate Question of Life," + " The Universe, and Everything"; 143 String v2 = " is... 42."; 144 Append a = new Append(row); 145 a.setReturnResults(false); 146 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 147 a.addColumn(fam1, qual2, Bytes.toBytes(v2)); 148 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty()); 149 a = new Append(row); 150 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 151 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 152 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 153 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), result.getValue(fam1, qual1))); 154 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), result.getValue(fam1, qual2))); 155 } 156 157 @Test 158 public void testAppendWithMultipleFamilies() throws IOException { 159 final byte[] fam3 = Bytes.toBytes("colfamily31"); 160 initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3); 161 String v1 = "Appended"; 162 String v2 = "Value"; 163 164 Append a = new Append(row); 165 a.setReturnResults(false); 166 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 167 a.addColumn(fam2, qual2, Bytes.toBytes(v2)); 168 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 169 assertTrue("Expected an empty result but result contains " + result.size() + " keys", 170 result.isEmpty()); 171 172 a = new Append(row); 173 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 174 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); 175 a.addColumn(fam3, qual3, Bytes.toBytes(v2)); 176 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); 177 178 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 179 180 byte[] actualValue1 = result.getValue(fam1, qual1); 181 byte[] actualValue2 = result.getValue(fam2, qual2); 182 byte[] actualValue3 = result.getValue(fam3, qual3); 183 byte[] actualValue4 = result.getValue(fam1, qual2); 184 185 assertNotNull("Value1 should bot be null", actualValue1); 186 assertNotNull("Value2 should bot be null", actualValue2); 187 assertNotNull("Value3 should bot be null", actualValue3); 188 assertNotNull("Value4 should bot be null", actualValue4); 189 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1)); 190 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2)); 191 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3)); 192 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4)); 193 } 194 195 @Test 196 public void testAppendWithNonExistingFamily() throws IOException { 197 initHRegion(tableName, name.getMethodName(), fam1); 198 final String v1 = "Value"; 199 final Append a = new Append(row); 200 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); 201 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); 202 Result result = null; 203 try { 204 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 205 fail("Append operation should fail with NoSuchColumnFamilyException."); 206 } catch (NoSuchColumnFamilyException e) { 207 assertEquals(null, result); 208 } catch (Exception e) { 209 fail("Append operation should fail with NoSuchColumnFamilyException."); 210 } 211 } 212 213 @Test 214 public void testIncrementWithNonExistingFamily() throws IOException { 215 initHRegion(tableName, name.getMethodName(), fam1); 216 final Increment inc = new Increment(row); 217 inc.addColumn(fam1, qual1, 1); 218 inc.addColumn(fam2, qual2, 1); 219 inc.setDurability(Durability.ASYNC_WAL); 220 try { 221 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); 222 } catch (NoSuchColumnFamilyException e) { 223 final Get g = new Get(row); 224 final Result result = region.get(g); 225 assertEquals(null, result.getValue(fam1, qual1)); 226 assertEquals(null, result.getValue(fam2, qual2)); 227 } catch (Exception e) { 228 fail("Increment operation should fail with NoSuchColumnFamilyException."); 229 } 230 } 231 232 /** 233 * Test multi-threaded increments. 234 */ 235 @Test 236 public void testIncrementMultiThreads() throws IOException { 237 boolean fast = true; 238 LOG.info("Starting test testIncrementMultiThreads"); 239 // run a with mixed column families (1 and 3 versions) 240 initHRegion(tableName, name.getMethodName(), new int[] { 1, 3 }, fam1, fam2); 241 242 // Create 100 threads, each will increment by its own quantity. All 100 threads update the 243 // same row over two column families. 244 int numThreads = 100; 245 int incrementsPerThread = 1000; 246 Incrementer[] all = new Incrementer[numThreads]; 247 int expectedTotal = 0; 248 // create all threads 249 for (int i = 0; i < numThreads; i++) { 250 all[i] = new Incrementer(region, i, i, incrementsPerThread); 251 expectedTotal += (i * incrementsPerThread); 252 } 253 254 // run all threads 255 for (int i = 0; i < numThreads; i++) { 256 all[i].start(); 257 } 258 259 // wait for all threads to finish 260 for (int i = 0; i < numThreads; i++) { 261 try { 262 all[i].join(); 263 } catch (InterruptedException e) { 264 LOG.info("Ignored", e); 265 } 266 } 267 assertICV(row, fam1, qual1, expectedTotal, fast); 268 assertICV(row, fam1, qual2, expectedTotal * 2, fast); 269 assertICV(row, fam2, qual3, expectedTotal * 3, fast); 270 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); 271 } 272 273 private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, long amount, boolean fast) 274 throws IOException { 275 // run a get and see? 276 Get get = new Get(row); 277 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 278 get.addColumn(familiy, qualifier); 279 Result result = region.get(get); 280 assertEquals(1, result.size()); 281 282 Cell kv = result.rawCells()[0]; 283 long r = Bytes.toLong(CellUtil.cloneValue(kv)); 284 assertEquals(amount, r); 285 } 286 287 private void initHRegion(byte[] tableName, String callingMethod, byte[]... families) 288 throws IOException { 289 initHRegion(tableName, callingMethod, null, families); 290 } 291 292 private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions, 293 byte[]... families) throws IOException { 294 TableDescriptorBuilder builder = 295 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); 296 297 int i = 0; 298 for (byte[] family : families) { 299 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(family) 300 .setMaxVersions(maxVersions != null ? maxVersions[i++] : 1).build(); 301 builder.setColumnFamily(familyDescriptor); 302 } 303 TableDescriptor tableDescriptor = builder.build(); 304 RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); 305 region = TEST_UTIL.createLocalHRegion(info, tableDescriptor); 306 } 307 308 /** 309 * A thread that makes increment calls always on the same row, this.row against two column 310 * families on this row. 311 */ 312 public static class Incrementer extends Thread { 313 314 private final Region region; 315 private final int numIncrements; 316 private final int amount; 317 318 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { 319 super("Incrementer." + threadNumber); 320 this.region = region; 321 this.numIncrements = numIncrements; 322 this.amount = amount; 323 setDaemon(true); 324 } 325 326 @Override 327 public void run() { 328 for (int i = 0; i < numIncrements; i++) { 329 try { 330 Increment inc = new Increment(row); 331 inc.addColumn(fam1, qual1, amount); 332 inc.addColumn(fam1, qual2, amount * 2); 333 inc.addColumn(fam2, qual3, amount * 3); 334 inc.setDurability(Durability.ASYNC_WAL); 335 Result result = region.increment(inc); 336 if (result != null) { 337 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2, 338 Bytes.toLong(result.getValue(fam1, qual2))); 339 assertTrue(result.getValue(fam2, qual3) != null); 340 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 3, 341 Bytes.toLong(result.getValue(fam2, qual3))); 342 assertEquals(Bytes.toLong(result.getValue(fam1, qual1)) * 2, 343 Bytes.toLong(result.getValue(fam1, qual2))); 344 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1)) * 3; 345 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); 346 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, fam1Increment, 347 fam2Increment); 348 } 349 } catch (IOException e) { 350 e.printStackTrace(); 351 } 352 } 353 } 354 } 355 356 @Test 357 public void testAppendMultiThreads() throws IOException { 358 LOG.info("Starting test testAppendMultiThreads"); 359 // run a with mixed column families (1 and 3 versions) 360 initHRegion(tableName, name.getMethodName(), new int[] { 1, 3 }, fam1, fam2); 361 362 int numThreads = 100; 363 int opsPerThread = 100; 364 AtomicOperation[] all = new AtomicOperation[numThreads]; 365 final byte[] val = new byte[] { 1 }; 366 367 AtomicInteger failures = new AtomicInteger(0); 368 // create all threads 369 for (int i = 0; i < numThreads; i++) { 370 all[i] = new AtomicOperation(region, opsPerThread, null, failures) { 371 @Override 372 public void run() { 373 for (int i = 0; i < numOps; i++) { 374 try { 375 Append a = new Append(row); 376 a.addColumn(fam1, qual1, val); 377 a.addColumn(fam1, qual2, val); 378 a.addColumn(fam2, qual3, val); 379 a.setDurability(Durability.ASYNC_WAL); 380 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); 381 382 Get g = new Get(row); 383 Result result = region.get(g); 384 assertEquals(result.getValue(fam1, qual1).length, 385 result.getValue(fam1, qual2).length); 386 assertEquals(result.getValue(fam1, qual1).length, 387 result.getValue(fam2, qual3).length); 388 } catch (IOException e) { 389 e.printStackTrace(); 390 failures.incrementAndGet(); 391 fail(); 392 } 393 } 394 } 395 }; 396 } 397 398 // run all threads 399 for (int i = 0; i < numThreads; i++) { 400 all[i].start(); 401 } 402 403 // wait for all threads to finish 404 for (int i = 0; i < numThreads; i++) { 405 try { 406 all[i].join(); 407 } catch (InterruptedException e) { 408 } 409 } 410 assertEquals(0, failures.get()); 411 Get g = new Get(row); 412 Result result = region.get(g); 413 assertEquals(10000, result.getValue(fam1, qual1).length); 414 assertEquals(10000, result.getValue(fam1, qual2).length); 415 assertEquals(10000, result.getValue(fam2, qual3).length); 416 } 417 418 /** 419 * Test multi-threaded row mutations. 420 */ 421 @Test 422 public void testRowMutationMultiThreads() throws IOException { 423 LOG.info("Starting test testRowMutationMultiThreads"); 424 initHRegion(tableName, name.getMethodName(), fam1); 425 426 // create 10 threads, each will alternate between adding and 427 // removing a column 428 int numThreads = 10; 429 int opsPerThread = 250; 430 AtomicOperation[] all = new AtomicOperation[numThreads]; 431 432 AtomicLong timeStamps = new AtomicLong(0); 433 AtomicInteger failures = new AtomicInteger(0); 434 // create all threads 435 for (int i = 0; i < numThreads; i++) { 436 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 437 @Override 438 public void run() { 439 boolean op = true; 440 for (int i = 0; i < numOps; i++) { 441 try { 442 // throw in some flushes 443 if (i % 10 == 0) { 444 synchronized (region) { 445 LOG.debug("flushing"); 446 region.flush(true); 447 if (i % 100 == 0) { 448 region.compact(false); 449 } 450 } 451 } 452 long ts = timeStamps.incrementAndGet(); 453 RowMutations rm = new RowMutations(row); 454 if (op) { 455 Put p = new Put(row, ts); 456 p.addColumn(fam1, qual1, value1); 457 p.setDurability(Durability.ASYNC_WAL); 458 rm.add(p); 459 Delete d = new Delete(row); 460 d.addColumns(fam1, qual2, ts); 461 d.setDurability(Durability.ASYNC_WAL); 462 rm.add(d); 463 } else { 464 Delete d = new Delete(row); 465 d.addColumns(fam1, qual1, ts); 466 d.setDurability(Durability.ASYNC_WAL); 467 rm.add(d); 468 Put p = new Put(row, ts); 469 p.addColumn(fam1, qual2, value2); 470 p.setDurability(Durability.ASYNC_WAL); 471 rm.add(p); 472 } 473 region.mutateRow(rm); 474 op ^= true; 475 // check: should always see exactly one column 476 Get g = new Get(row); 477 Result r = region.get(g); 478 if (r.size() != 1) { 479 LOG.debug(Objects.toString(r)); 480 failures.incrementAndGet(); 481 fail(); 482 } 483 } catch (IOException e) { 484 e.printStackTrace(); 485 failures.incrementAndGet(); 486 fail(); 487 } 488 } 489 } 490 }; 491 } 492 493 // run all threads 494 for (int i = 0; i < numThreads; i++) { 495 all[i].start(); 496 } 497 498 // wait for all threads to finish 499 for (int i = 0; i < numThreads; i++) { 500 try { 501 all[i].join(); 502 } catch (InterruptedException e) { 503 } 504 } 505 assertEquals(0, failures.get()); 506 } 507 508 /** 509 * Test multi-threaded region mutations. 510 */ 511 @Test 512 public void testMultiRowMutationMultiThreads() throws IOException { 513 514 LOG.info("Starting test testMultiRowMutationMultiThreads"); 515 initHRegion(tableName, name.getMethodName(), fam1); 516 517 // create 10 threads, each will alternate between adding and 518 // removing a column 519 int numThreads = 10; 520 int opsPerThread = 250; 521 AtomicOperation[] all = new AtomicOperation[numThreads]; 522 523 AtomicLong timeStamps = new AtomicLong(0); 524 AtomicInteger failures = new AtomicInteger(0); 525 final List<byte[]> rowsToLock = Arrays.asList(row, row2); 526 // create all threads 527 for (int i = 0; i < numThreads; i++) { 528 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { 529 @Override 530 public void run() { 531 boolean op = true; 532 for (int i = 0; i < numOps; i++) { 533 try { 534 // throw in some flushes 535 if (i % 10 == 0) { 536 synchronized (region) { 537 LOG.debug("flushing"); 538 region.flush(true); 539 if (i % 100 == 0) { 540 region.compact(false); 541 } 542 } 543 } 544 long ts = timeStamps.incrementAndGet(); 545 List<Mutation> mrm = new ArrayList<>(); 546 if (op) { 547 Put p = new Put(row2, ts); 548 p.addColumn(fam1, qual1, value1); 549 p.setDurability(Durability.ASYNC_WAL); 550 mrm.add(p); 551 Delete d = new Delete(row); 552 d.addColumns(fam1, qual1, ts); 553 d.setDurability(Durability.ASYNC_WAL); 554 mrm.add(d); 555 } else { 556 Delete d = new Delete(row2); 557 d.addColumns(fam1, qual1, ts); 558 d.setDurability(Durability.ASYNC_WAL); 559 mrm.add(d); 560 Put p = new Put(row, ts); 561 p.setDurability(Durability.ASYNC_WAL); 562 p.addColumn(fam1, qual1, value2); 563 mrm.add(p); 564 } 565 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); 566 op ^= true; 567 // check: should always see exactly one column 568 Scan s = new Scan().withStartRow(row); 569 RegionScanner rs = region.getScanner(s); 570 List<Cell> r = new ArrayList<>(); 571 while (rs.next(r)) 572 ; 573 rs.close(); 574 if (r.size() != 1) { 575 LOG.debug(Objects.toString(r)); 576 failures.incrementAndGet(); 577 fail(); 578 } 579 } catch (IOException e) { 580 e.printStackTrace(); 581 failures.incrementAndGet(); 582 fail(); 583 } 584 } 585 } 586 }; 587 } 588 589 // run all threads 590 for (int i = 0; i < numThreads; i++) { 591 all[i].start(); 592 } 593 594 // wait for all threads to finish 595 for (int i = 0; i < numThreads; i++) { 596 try { 597 all[i].join(); 598 } catch (InterruptedException e) { 599 } 600 } 601 assertEquals(0, failures.get()); 602 } 603 604 public static class AtomicOperation extends Thread { 605 protected final HRegion region; 606 protected final int numOps; 607 protected final AtomicLong timeStamps; 608 protected final AtomicInteger failures; 609 610 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, 611 AtomicInteger failures) { 612 this.region = region; 613 this.numOps = numOps; 614 this.timeStamps = timeStamps; 615 this.failures = failures; 616 } 617 } 618 619 private static CountDownLatch latch = new CountDownLatch(1); 620 621 private enum TestStep { 622 INIT, // initial put of 10 to set value of the cell 623 PUT_STARTED, // began doing a put of 50 to cell 624 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC). 625 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11 626 CHECKANDPUT_COMPLETED // completed checkAndPut 627 // NOTE: at the end of these steps, the value of the cell should be 50, not 11! 628 } 629 630 private static volatile TestStep testStep = TestStep.INIT; 631 private final String family = "f1"; 632 633 /** 634 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read MVCC. Moved into 635 * TestAtomicOperation from its original location, TestHBase7051 636 */ 637 @Test 638 public void testPutAndCheckAndPutInParallel() throws Exception { 639 Configuration conf = TEST_UTIL.getConfiguration(); 640 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); 641 TableDescriptorBuilder tableDescriptorBuilder = 642 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())); 643 ColumnFamilyDescriptor columnFamilyDescriptor = 644 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build(); 645 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); 646 this.region = TEST_UTIL.createLocalHRegion(tableDescriptorBuilder.build(), null, null); 647 Put[] puts = new Put[1]; 648 Put put = new Put(Bytes.toBytes("r1")); 649 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); 650 puts[0] = put; 651 652 region.batchMutate(puts); 653 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 654 ctx.addThread(new PutThread(ctx, region)); 655 ctx.addThread(new CheckAndPutThread(ctx, region)); 656 ctx.startThreads(); 657 while (testStep != TestStep.CHECKANDPUT_COMPLETED) { 658 Thread.sleep(100); 659 } 660 ctx.stop(); 661 Scan s = new Scan(); 662 RegionScanner scanner = region.getScanner(s); 663 List<Cell> results = new ArrayList<>(); 664 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build(); 665 scanner.next(results, scannerContext); 666 for (Cell keyValue : results) { 667 assertEquals("50", Bytes.toString(CellUtil.cloneValue(keyValue))); 668 } 669 } 670 671 private class PutThread extends TestThread { 672 private Region region; 673 674 PutThread(TestContext ctx, Region region) { 675 super(ctx); 676 this.region = region; 677 } 678 679 @Override 680 public void doWork() throws Exception { 681 Put[] puts = new Put[1]; 682 Put put = new Put(Bytes.toBytes("r1")); 683 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); 684 puts[0] = put; 685 testStep = TestStep.PUT_STARTED; 686 region.batchMutate(puts); 687 } 688 } 689 690 private class CheckAndPutThread extends TestThread { 691 private Region region; 692 693 CheckAndPutThread(TestContext ctx, Region region) { 694 super(ctx); 695 this.region = region; 696 } 697 698 @Override 699 public void doWork() throws Exception { 700 Put[] puts = new Put[1]; 701 Put put = new Put(Bytes.toBytes("r1")); 702 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11")); 703 puts[0] = put; 704 while (testStep != TestStep.PUT_COMPLETED) { 705 Thread.sleep(100); 706 } 707 testStep = TestStep.CHECKANDPUT_STARTED; 708 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), 709 CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put); 710 testStep = TestStep.CHECKANDPUT_COMPLETED; 711 } 712 } 713 714 public static class MockHRegion extends HRegion { 715 716 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf, 717 final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) { 718 super(tableDir, log, fs, conf, regionInfo, htd, rsServices); 719 } 720 721 @Override 722 protected RowLock getRowLockInternal(final byte[] row, boolean readLock, 723 final RowLock prevRowlock) throws IOException { 724 if (testStep == TestStep.CHECKANDPUT_STARTED) { 725 latch.countDown(); 726 } 727 return new WrappedRowLock(super.getRowLockInternal(row, readLock, null)); 728 } 729 730 public class WrappedRowLock implements RowLock { 731 732 private final RowLock rowLock; 733 734 private WrappedRowLock(RowLock rowLock) { 735 this.rowLock = rowLock; 736 } 737 738 @Override 739 public void release() { 740 if (testStep == TestStep.INIT) { 741 this.rowLock.release(); 742 return; 743 } 744 745 if (testStep == TestStep.PUT_STARTED) { 746 try { 747 testStep = TestStep.PUT_COMPLETED; 748 this.rowLock.release(); 749 // put has been written to the memstore and the row lock has been released, but the 750 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of 751 // operations would cause the non-atomicity to show up: 752 // 1) Put releases row lock (where we are now) 753 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10) 754 // because the MVCC has not advanced 755 // 3) Put advances MVCC 756 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock 757 // (see below), and then wait some more to give the checkAndPut time to read the old 758 // value. 759 latch.await(); 760 Thread.sleep(1000); 761 } catch (InterruptedException e) { 762 Thread.currentThread().interrupt(); 763 } 764 } else if (testStep == TestStep.CHECKANDPUT_STARTED) { 765 this.rowLock.release(); 766 } 767 } 768 } 769 } 770}