001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.Arrays; 029import java.util.List; 030import java.util.concurrent.CyclicBarrier; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicReference; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.io.ByteBuffAllocator; 036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 037import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 038import org.apache.hadoop.hbase.io.hfile.BlockType; 039import org.apache.hadoop.hbase.io.hfile.Cacheable; 040import org.apache.hadoop.hbase.io.hfile.HFileBlock; 041import org.apache.hadoop.hbase.io.hfile.HFileContext; 042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 043import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread; 044import org.apache.hadoop.hbase.nio.ByteBuff; 045import org.apache.hadoop.hbase.nio.RefCnt; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051 052@Category({ IOTests.class, SmallTests.class }) 053public class TestBucketCacheRefCnt { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestBucketCacheRefCnt.class); 058 059 private static final String IO_ENGINE = "offheap"; 060 private static final long CAPACITY_SIZE = 32 * 1024 * 1024; 061 private static final int BLOCK_SIZE = 1024; 062 private static final int[] BLOCK_SIZE_ARRAY = 063 new int[] { 64, 128, 256, 512, 1024, 2048, 4096, 8192 }; 064 private static final String PERSISTENCE_PATH = null; 065 private static final HFileContext CONTEXT = new HFileContextBuilder().build(); 066 067 private BucketCache cache; 068 069 private static BucketCache create(int writerSize, int queueSize) throws IOException { 070 return new BucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 071 queueSize, PERSISTENCE_PATH); 072 } 073 074 private static MyBucketCache createMyBucketCache(int writerSize, int queueSize) 075 throws IOException { 076 return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 077 queueSize, PERSISTENCE_PATH); 078 } 079 080 private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize) 081 throws IOException { 082 return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, 083 queueSize, PERSISTENCE_PATH); 084 } 085 086 private static HFileBlock createBlock(int offset, int size) { 087 return createBlock(offset, size, ByteBuffAllocator.HEAP); 088 } 089 090 private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) { 091 return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)), 092 HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc); 093 } 094 095 private static BlockCacheKey createKey(String hfileName, long offset) { 096 return new BlockCacheKey(hfileName, offset); 097 } 098 099 private void disableWriter() { 100 if (cache != null) { 101 for (WriterThread wt : cache.writerThreads) { 102 wt.disableWriter(); 103 wt.interrupt(); 104 } 105 } 106 } 107 108 @org.junit.Ignore 109 @Test // Disabled by HBASE-24079. Reenable issue HBASE-24082 110 // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2> 111 public void testBlockInRAMCache() throws IOException { 112 cache = create(1, 1000); 113 disableWriter(); 114 final String prefix = "testBlockInRamCache"; 115 try { 116 for (int i = 0; i < 10; i++) { 117 HFileBlock blk = createBlock(i, 1020); 118 BlockCacheKey key = createKey(prefix, i); 119 assertEquals(1, blk.refCnt()); 120 cache.cacheBlock(key, blk); 121 assertEquals(i + 1, cache.getBlockCount()); 122 assertEquals(2, blk.refCnt()); 123 124 Cacheable block = cache.getBlock(key, false, false, false); 125 try { 126 assertEquals(3, blk.refCnt()); 127 assertEquals(3, block.refCnt()); 128 assertEquals(blk, block); 129 } finally { 130 block.release(); 131 } 132 assertEquals(2, blk.refCnt()); 133 assertEquals(2, block.refCnt()); 134 } 135 136 for (int i = 0; i < 10; i++) { 137 BlockCacheKey key = createKey(prefix, i); 138 Cacheable blk = cache.getBlock(key, false, false, false); 139 assertEquals(3, blk.refCnt()); 140 assertFalse(blk.release()); 141 assertEquals(2, blk.refCnt()); 142 143 assertTrue(cache.evictBlock(key)); 144 assertEquals(1, blk.refCnt()); 145 assertTrue(blk.release()); 146 assertEquals(0, blk.refCnt()); 147 } 148 } finally { 149 cache.shutdown(); 150 } 151 } 152 153 private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey) 154 throws InterruptedException { 155 while ( 156 !bucketCache.backingMap.containsKey(blockCacheKey) 157 || bucketCache.ramCache.containsKey(blockCacheKey) 158 ) { 159 Thread.sleep(100); 160 } 161 Thread.sleep(1000); 162 } 163 164 @Test 165 public void testBlockInBackingMap() throws Exception { 166 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 167 cache = create(1, 1000); 168 try { 169 HFileBlock blk = createBlock(200, 1020, alloc); 170 BlockCacheKey key = createKey("testHFile-00", 200); 171 cache.cacheBlock(key, blk); 172 waitUntilFlushedToCache(cache, key); 173 assertEquals(1, blk.refCnt()); 174 175 Cacheable block = cache.getBlock(key, false, false, false); 176 assertTrue(block instanceof HFileBlock); 177 assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc); 178 assertEquals(2, block.refCnt()); 179 180 block.retain(); 181 assertEquals(3, block.refCnt()); 182 183 Cacheable newBlock = cache.getBlock(key, false, false, false); 184 assertTrue(newBlock instanceof HFileBlock); 185 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 186 assertEquals(4, newBlock.refCnt()); 187 188 // release the newBlock 189 assertFalse(newBlock.release()); 190 assertEquals(3, newBlock.refCnt()); 191 assertEquals(3, block.refCnt()); 192 193 // Evict the key 194 cache.evictBlock(key); 195 assertEquals(2, block.refCnt()); 196 197 // Evict again, shouldn't change the refCnt. 198 cache.evictBlock(key); 199 assertEquals(2, block.refCnt()); 200 201 assertFalse(block.release()); 202 assertEquals(1, block.refCnt()); 203 204 /** 205 * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache}, 206 * so {@link BucketCache#getBlock} return null. 207 */ 208 Cacheable newestBlock = cache.getBlock(key, false, false, false); 209 assertNull(newestBlock); 210 assertEquals(1, block.refCnt()); 211 assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); 212 213 // Release the block 214 assertTrue(block.release()); 215 assertEquals(0, block.refCnt()); 216 assertEquals(0, newBlock.refCnt()); 217 } finally { 218 cache.shutdown(); 219 } 220 } 221 222 @Test 223 public void testInBucketCache() throws IOException { 224 ByteBuffAllocator alloc = ByteBuffAllocator.create(HBaseConfiguration.create(), true); 225 cache = create(1, 1000); 226 try { 227 HFileBlock blk = createBlock(200, 1020, alloc); 228 BlockCacheKey key = createKey("testHFile-00", 200); 229 cache.cacheBlock(key, blk); 230 assertTrue(blk.refCnt() == 1 || blk.refCnt() == 2); 231 232 Cacheable block1 = cache.getBlock(key, false, false, false); 233 assertTrue(block1.refCnt() >= 2); 234 assertTrue(((HFileBlock) block1).getByteBuffAllocator() == alloc); 235 236 Cacheable block2 = cache.getBlock(key, false, false, false); 237 assertTrue(((HFileBlock) block2).getByteBuffAllocator() == alloc); 238 assertTrue(block2.refCnt() >= 3); 239 240 cache.evictBlock(key); 241 assertTrue(blk.refCnt() >= 1); 242 assertTrue(block1.refCnt() >= 2); 243 assertTrue(block2.refCnt() >= 2); 244 245 // Get key again 246 Cacheable block3 = cache.getBlock(key, false, false, false); 247 if (block3 != null) { 248 assertTrue(((HFileBlock) block3).getByteBuffAllocator() == alloc); 249 assertTrue(block3.refCnt() >= 3); 250 assertFalse(block3.release()); 251 } 252 253 blk.release(); 254 boolean ret1 = block1.release(); 255 boolean ret2 = block2.release(); 256 assertTrue(ret1 || ret2); 257 assertEquals(0, blk.refCnt()); 258 assertEquals(0, block1.refCnt()); 259 assertEquals(0, block2.refCnt()); 260 } finally { 261 cache.shutdown(); 262 } 263 } 264 265 @Test 266 public void testMarkStaleAsEvicted() throws Exception { 267 cache = create(1, 1000); 268 try { 269 HFileBlock blk = createBlock(200, 1020); 270 BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200); 271 cache.cacheBlock(key, blk); 272 waitUntilFlushedToCache(cache, key); 273 assertEquals(1, blk.refCnt()); 274 assertNotNull(cache.backingMap.get(key)); 275 assertEquals(1, cache.backingMap.get(key).refCnt()); 276 277 // RPC reference this cache. 278 Cacheable block1 = cache.getBlock(key, false, false, false); 279 assertEquals(2, block1.refCnt()); 280 BucketEntry be1 = cache.backingMap.get(key); 281 assertNotNull(be1); 282 assertEquals(2, be1.refCnt()); 283 284 // We've some RPC reference, so it won't have any effect. 285 assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 286 assertEquals(2, block1.refCnt()); 287 assertEquals(2, cache.backingMap.get(key).refCnt()); 288 289 // Release the RPC reference. 290 block1.release(); 291 assertEquals(1, block1.refCnt()); 292 assertEquals(1, cache.backingMap.get(key).refCnt()); 293 294 // Mark the stale as evicted again, it'll do the de-allocation. 295 assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); 296 assertEquals(0, block1.refCnt()); 297 assertNull(cache.backingMap.get(key)); 298 assertEquals(0, cache.size()); 299 } finally { 300 cache.shutdown(); 301 } 302 } 303 304 /** 305 * <pre> 306 * This test is for HBASE-26281, 307 * test two threads for replacing Block and getting Block execute concurrently. 308 * The threads sequence is: 309 * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1. 310 * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied 311 * {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would 312 * replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal} 313 * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 314 * which returned Block1, the {@link RefCnt} of Block1 is 2. 315 * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap}, 316 * the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used 317 * by Thread2 and the content of Block1 would be overwritten after it is freed, which may 318 * cause a serious error. 319 * </pre> 320 */ 321 @Test 322 public void testReplacingBlockAndGettingBlockConcurrently() throws Exception { 323 ByteBuffAllocator byteBuffAllocator = 324 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 325 final MyBucketCache myBucketCache = createMyBucketCache(1, 1000); 326 try { 327 HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 328 final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200); 329 myBucketCache.cacheBlock(blockCacheKey, hfileBlock); 330 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 331 assertEquals(1, hfileBlock.refCnt()); 332 333 assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey)); 334 final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(); 335 Thread cacheBlockThread = new Thread(() -> { 336 try { 337 HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator); 338 myBucketCache.cacheBlock(blockCacheKey, newHFileBlock); 339 waitUntilFlushedToCache(myBucketCache, blockCacheKey); 340 341 } catch (Throwable exception) { 342 exceptionRef.set(exception); 343 } 344 }); 345 cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME); 346 cacheBlockThread.start(); 347 348 String oldThreadName = Thread.currentThread().getName(); 349 HFileBlock gotHFileBlock = null; 350 try { 351 352 Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME); 353 354 gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false)); 355 assertTrue(gotHFileBlock.equals(hfileBlock)); 356 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 357 assertEquals(2, gotHFileBlock.refCnt()); 358 /** 359 * Release the second cyclicBarrier.await in 360 * {@link MyBucketCache#cacheBlockWithWaitInternal} 361 */ 362 myBucketCache.cyclicBarrier.await(); 363 364 } finally { 365 Thread.currentThread().setName(oldThreadName); 366 } 367 368 cacheBlockThread.join(); 369 assertTrue(exceptionRef.get() == null); 370 assertEquals(1, gotHFileBlock.refCnt()); 371 assertTrue(gotHFileBlock.equals(hfileBlock)); 372 assertTrue(myBucketCache.overwiteByteBuff == null); 373 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0); 374 375 gotHFileBlock.release(); 376 assertEquals(0, gotHFileBlock.refCnt()); 377 assertTrue(myBucketCache.overwiteByteBuff != null); 378 assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1); 379 assertTrue(myBucketCache.replaceCounter.get() == 1); 380 assertTrue(myBucketCache.blockEvictCounter.get() == 1); 381 } finally { 382 myBucketCache.shutdown(); 383 } 384 385 } 386 387 /** 388 * <pre> 389 * This test also is for HBASE-26281, 390 * test three threads for evicting Block,caching Block and getting Block 391 * execute concurrently. 392 * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap}, 393 * the {@link RefCnt} of Block1 is 1. 394 * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey}, 395 * but stopping after {@link BucketCache#removeFromRamCache}. 396 * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey}, 397 * which returned Block1, the {@link RefCnt} of Block1 is 2. 398 * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove} 399 * returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1 400 * directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3. 401 * </pre> 402 */ 403 @Test 404 public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception { 405 ByteBuffAllocator byteBuffAllocator = 406 ByteBuffAllocator.create(HBaseConfiguration.create(), true); 407 final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000); 408 try { 409 final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); 410 final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200); 411 final AtomicReference<Throwable> cacheBlockThreadExceptionRef = 412 new AtomicReference<Throwable>(); 413 Thread cacheBlockThread = new Thread(() -> { 414 try { 415 myBucketCache2.cacheBlock(blockCacheKey, hfileBlock); 416 /** 417 * Wait for Caching Block completed. 418 */ 419 myBucketCache2.writeThreadDoneCyclicBarrier.await(); 420 } catch (Throwable exception) { 421 cacheBlockThreadExceptionRef.set(exception); 422 } 423 }); 424 cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME); 425 cacheBlockThread.start(); 426 427 final AtomicReference<Throwable> evictBlockThreadExceptionRef = 428 new AtomicReference<Throwable>(); 429 Thread evictBlockThread = new Thread(() -> { 430 try { 431 myBucketCache2.evictBlock(blockCacheKey); 432 } catch (Throwable exception) { 433 evictBlockThreadExceptionRef.set(exception); 434 } 435 }); 436 evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME); 437 evictBlockThread.start(); 438 439 String oldThreadName = Thread.currentThread().getName(); 440 HFileBlock gotHFileBlock = null; 441 try { 442 Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME); 443 gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false)); 444 assertTrue(gotHFileBlock.equals(hfileBlock)); 445 assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); 446 assertEquals(2, gotHFileBlock.refCnt()); 447 try { 448 /** 449 * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for 450 * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread} 451 * could continue. 452 */ 453 myBucketCache2.putCyclicBarrier.await(); 454 } catch (Throwable e) { 455 throw new RuntimeException(e); 456 } 457 458 } finally { 459 Thread.currentThread().setName(oldThreadName); 460 } 461 462 cacheBlockThread.join(); 463 evictBlockThread.join(); 464 assertTrue(cacheBlockThreadExceptionRef.get() == null); 465 assertTrue(evictBlockThreadExceptionRef.get() == null); 466 467 assertTrue(gotHFileBlock.equals(hfileBlock)); 468 assertEquals(1, gotHFileBlock.refCnt()); 469 assertTrue(myBucketCache2.overwiteByteBuff == null); 470 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0); 471 472 gotHFileBlock.release(); 473 assertEquals(0, gotHFileBlock.refCnt()); 474 assertTrue(myBucketCache2.overwiteByteBuff != null); 475 assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1); 476 assertTrue(myBucketCache2.blockEvictCounter.get() == 1); 477 } finally { 478 myBucketCache2.shutdown(); 479 } 480 481 } 482 483 static class MyBucketCache extends BucketCache { 484 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 485 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 486 487 private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 488 private final AtomicInteger replaceCounter = new AtomicInteger(0); 489 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 490 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 491 private ByteBuff overwiteByteBuff = null; 492 493 public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 494 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 495 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 496 persistencePath); 497 } 498 499 /** 500 * Simulate the Block could be replaced. 501 */ 502 @Override 503 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 504 replaceCounter.incrementAndGet(); 505 return true; 506 } 507 508 @Override 509 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 510 boolean updateCacheMetrics) { 511 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 512 /** 513 * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal}, 514 * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef} 515 * checking. 516 */ 517 try { 518 cyclicBarrier.await(); 519 } catch (Throwable e) { 520 throw new RuntimeException(e); 521 } 522 } 523 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 524 return result; 525 } 526 527 @Override 528 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 529 boolean inMemory, boolean wait) { 530 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 531 /** 532 * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock} 533 */ 534 try { 535 cyclicBarrier.await(); 536 } catch (Throwable e) { 537 throw new RuntimeException(e); 538 } 539 } 540 if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { 541 /** 542 * Wait the cyclicBarrier.await() in 543 * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for 544 * {@link MyBucketCache#getBlock} and Assert completed. 545 */ 546 try { 547 cyclicBarrier.await(); 548 } catch (Throwable e) { 549 throw new RuntimeException(e); 550 } 551 } 552 super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 553 } 554 555 @Override 556 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 557 boolean evictedByEvictionProcess) { 558 blockEvictCounter.incrementAndGet(); 559 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 560 } 561 562 /** 563 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 564 * {@link BucketEntry} is freed. 565 */ 566 @Override 567 void freeBucketEntry(BucketEntry bucketEntry) { 568 freeBucketEntryCounter.incrementAndGet(); 569 super.freeBucketEntry(bucketEntry); 570 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 571 try { 572 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 573 } catch (IOException e) { 574 throw new RuntimeException(e); 575 } 576 } 577 } 578 579 static class MyBucketCache2 extends BucketCache { 580 private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; 581 private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; 582 private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread"; 583 584 private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2); 585 private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2); 586 private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2); 587 /** 588 * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and 589 * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed. 590 */ 591 private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3); 592 private final AtomicInteger blockEvictCounter = new AtomicInteger(0); 593 private final AtomicInteger removeRamCounter = new AtomicInteger(0); 594 private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); 595 private ByteBuff overwiteByteBuff = null; 596 597 public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 598 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 599 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 600 persistencePath); 601 } 602 603 @Override 604 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 605 super.putIntoBackingMap(key, bucketEntry); 606 /** 607 * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before 608 * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME} 609 */ 610 try { 611 evictCyclicBarrier.await(); 612 } catch (Throwable e) { 613 throw new RuntimeException(e); 614 } 615 616 /** 617 * Wait the cyclicBarrier.await() in 618 * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for 619 * {@link MyBucketCache#getBlock} and Assert completed. 620 */ 621 try { 622 putCyclicBarrier.await(); 623 } catch (Throwable e) { 624 throw new RuntimeException(e); 625 } 626 } 627 628 @Override 629 void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 630 super.doDrain(entries, metaBuff); 631 if (entries.size() > 0) { 632 /** 633 * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and 634 * {@link #EVICT_BLOCK_THREAD_NAME}. 635 */ 636 try { 637 writeThreadDoneCyclicBarrier.await(); 638 } catch (Throwable e) { 639 throw new RuntimeException(e); 640 } 641 } 642 643 } 644 645 @Override 646 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 647 boolean updateCacheMetrics) { 648 if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { 649 /** 650 * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after 651 * {@link BucketCache#removeFromRamCache}. 652 */ 653 try { 654 getCyclicBarrier.await(); 655 } catch (Throwable e) { 656 throw new RuntimeException(e); 657 } 658 } 659 Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); 660 return result; 661 } 662 663 @Override 664 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 665 boolean firstTime = false; 666 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 667 int count = this.removeRamCounter.incrementAndGet(); 668 firstTime = (count == 1); 669 if (firstTime) { 670 /** 671 * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after 672 * {@link BucketCache#putIntoBackingMap}. 673 */ 674 try { 675 evictCyclicBarrier.await(); 676 } catch (Throwable e) { 677 throw new RuntimeException(e); 678 } 679 } 680 } 681 boolean result = super.removeFromRamCache(cacheKey); 682 if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { 683 if (firstTime) { 684 /** 685 * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}. 686 */ 687 try { 688 getCyclicBarrier.await(); 689 } catch (Throwable e) { 690 throw new RuntimeException(e); 691 } 692 /** 693 * Wait for Caching Block completed, after Caching Block completed, evictBlock could 694 * continue. 695 */ 696 try { 697 writeThreadDoneCyclicBarrier.await(); 698 } catch (Throwable e) { 699 throw new RuntimeException(e); 700 } 701 } 702 } 703 704 return result; 705 } 706 707 @Override 708 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 709 boolean evictedByEvictionProcess) { 710 /** 711 * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create 712 * only one {@link BucketCache.WriterThread}. 713 */ 714 assertTrue(Thread.currentThread() == this.writerThreads[0]); 715 716 blockEvictCounter.incrementAndGet(); 717 super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber, evictedByEvictionProcess); 718 } 719 720 /** 721 * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the 722 * {@link BucketEntry} is freed. 723 */ 724 @Override 725 void freeBucketEntry(BucketEntry bucketEntry) { 726 freeBucketEntryCounter.incrementAndGet(); 727 super.freeBucketEntry(bucketEntry); 728 this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); 729 try { 730 this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); 731 } catch (IOException e) { 732 throw new RuntimeException(e); 733 } 734 } 735 } 736 737 private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) { 738 int byteSize = bucketEntry.getLength(); 739 byte[] data = new byte[byteSize]; 740 Arrays.fill(data, (byte) 0xff); 741 return ByteBuff.wrap(ByteBuffer.wrap(data)); 742 } 743}