001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertFalse; 027import static org.junit.Assert.assertNotEquals; 028import static org.junit.Assert.assertNotNull; 029import static org.junit.Assert.assertNull; 030import static org.junit.Assert.assertTrue; 031import static org.mockito.Mockito.mock; 032import static org.mockito.Mockito.when; 033 034import java.io.File; 035import java.io.IOException; 036import java.nio.ByteBuffer; 037import java.util.ArrayList; 038import java.util.Arrays; 039import java.util.Collection; 040import java.util.HashMap; 041import java.util.List; 042import java.util.Map; 043import java.util.Set; 044import java.util.concurrent.ThreadLocalRandom; 045import java.util.concurrent.locks.ReentrantReadWriteLock; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseConfiguration; 050import org.apache.hadoop.hbase.HBaseTestingUtil; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.Waiter; 053import org.apache.hadoop.hbase.io.ByteBuffAllocator; 054import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 055import org.apache.hadoop.hbase.io.hfile.BlockType; 056import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 057import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 058import org.apache.hadoop.hbase.io.hfile.Cacheable; 059import org.apache.hadoop.hbase.io.hfile.HFileBlock; 060import org.apache.hadoop.hbase.io.hfile.HFileContext; 061import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 062import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 063import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 064import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 065import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 066import org.apache.hadoop.hbase.nio.ByteBuff; 067import org.apache.hadoop.hbase.regionserver.HRegion; 068import org.apache.hadoop.hbase.regionserver.HStore; 069import org.apache.hadoop.hbase.regionserver.HStoreFile; 070import org.apache.hadoop.hbase.testclassification.IOTests; 071import org.apache.hadoop.hbase.testclassification.LargeTests; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.hbase.util.Pair; 074import org.apache.hadoop.hbase.util.Threads; 075import org.junit.After; 076import org.junit.Assert; 077import org.junit.Before; 078import org.junit.ClassRule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.junit.runner.RunWith; 082import org.junit.runners.Parameterized; 083import org.mockito.Mockito; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 088 089/** 090 * Basic test of BucketCache.Puts and gets. 091 * <p> 092 * Tests will ensure that blocks' data correctness under several threads concurrency 093 */ 094@RunWith(Parameterized.class) 095@Category({ IOTests.class, LargeTests.class }) 096public class TestBucketCache { 097 098 private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class); 099 100 @ClassRule 101 public static final HBaseClassTestRule CLASS_RULE = 102 HBaseClassTestRule.forClass(TestBucketCache.class); 103 104 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 105 public static Iterable<Object[]> data() { 106 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 107 // for these tests? 108 { 16 * 1024, 109 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 110 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 111 128 * 1024 + 1024 } } }); 112 } 113 114 @Parameterized.Parameter(0) 115 public int constructedBlockSize; 116 117 @Parameterized.Parameter(1) 118 public int[] constructedBlockSizes; 119 120 BucketCache cache; 121 final int CACHE_SIZE = 1000000; 122 final int NUM_BLOCKS = 100; 123 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 124 final int NUM_THREADS = 100; 125 final int NUM_QUERIES = 10000; 126 127 final long capacitySize = 32 * 1024 * 1024; 128 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 129 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 130 private String ioEngineName = "offheap"; 131 132 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 133 134 private static class MockedBucketCache extends BucketCache { 135 136 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 137 int writerThreads, int writerQLen, String persistencePath) throws IOException { 138 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 139 persistencePath); 140 } 141 142 @Override 143 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 144 super.cacheBlock(cacheKey, buf, inMemory); 145 } 146 147 @Override 148 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 149 super.cacheBlock(cacheKey, buf); 150 } 151 } 152 153 @Before 154 public void setup() throws IOException { 155 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 156 constructedBlockSizes, writeThreads, writerQLen, null); 157 } 158 159 @After 160 public void tearDown() { 161 cache.shutdown(); 162 } 163 164 /** 165 * Test Utility to create test dir and return name 166 * @return return name of created dir 167 * @throws IOException throws IOException 168 */ 169 private Path createAndGetTestDir() throws IOException { 170 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 171 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 172 return testDir; 173 } 174 175 /** 176 * Return a random element from {@code a}. 177 */ 178 private static <T> T randFrom(List<T> a) { 179 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 180 } 181 182 @Test 183 public void testBucketAllocator() throws BucketAllocatorException { 184 BucketAllocator mAllocator = cache.getAllocator(); 185 /* 186 * Test the allocator first 187 */ 188 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 189 190 boolean full = false; 191 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 192 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 193 // the cache is completely filled. 194 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 195 while (!full) { 196 Integer blockSize = null; 197 try { 198 blockSize = randFrom(tmp); 199 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 200 } catch (CacheFullException cfe) { 201 tmp.remove(blockSize); 202 if (tmp.isEmpty()) full = true; 203 } 204 } 205 206 for (Integer blockSize : BLOCKSIZES) { 207 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 208 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 209 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 210 211 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 212 // additional 1024 on top of that so this counts towards fragmentation in our test 213 // real life may have worse fragmentation because blocks may not be perfectly sized to block 214 // size, given encoding/compression and large rows 215 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 216 } 217 218 mAllocator.logDebugStatistics(); 219 220 for (Pair<Long, Integer> allocation : allocations) { 221 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 222 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 223 } 224 assertEquals(0, mAllocator.getUsedSize()); 225 } 226 227 @Test 228 public void testCacheSimple() throws Exception { 229 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 230 } 231 232 @Test 233 public void testCacheMultiThreadedSingleKey() throws Exception { 234 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 235 } 236 237 @Test 238 public void testHeapSizeChanges() throws Exception { 239 cache.stopWriterThreads(); 240 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 241 } 242 243 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 244 throws InterruptedException { 245 Waiter.waitFor(HBaseConfiguration.create(), 10000, 246 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 247 } 248 249 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 250 while (!cache.ramCache.isEmpty()) { 251 Thread.sleep(100); 252 } 253 Thread.sleep(1000); 254 } 255 256 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 257 // threads will flush it to the bucket and put reference entry in backingMap. 258 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 259 Cacheable block, boolean waitWhenCache) throws InterruptedException { 260 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 261 waitUntilFlushedToBucket(cache, cacheKey); 262 } 263 264 @Test 265 public void testMemoryLeak() throws Exception { 266 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 267 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 268 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 269 long lockId = cache.backingMap.get(cacheKey).offset(); 270 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 271 lock.writeLock().lock(); 272 Thread evictThread = new Thread("evict-block") { 273 @Override 274 public void run() { 275 cache.evictBlock(cacheKey); 276 } 277 }; 278 evictThread.start(); 279 cache.offsetLock.waitForWaiters(lockId, 1); 280 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 281 assertEquals(0, cache.getBlockCount()); 282 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 283 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 284 assertEquals(1, cache.getBlockCount()); 285 lock.writeLock().unlock(); 286 evictThread.join(); 287 /** 288 * <pre> 289 * The asserts here before HBASE-21957 are: 290 * assertEquals(1L, cache.getBlockCount()); 291 * assertTrue(cache.getCurrentSize() > 0L); 292 * assertTrue("We should have a block!", cache.iterator().hasNext()); 293 * 294 * The asserts here after HBASE-21957 are: 295 * assertEquals(0, cache.getBlockCount()); 296 * assertEquals(cache.getCurrentSize(), 0L); 297 * 298 * I think the asserts before HBASE-21957 is more reasonable,because 299 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 300 * it had seen, and newly added Block after the {@link BucketEntry} 301 * it had seen should not be evicted. 302 * </pre> 303 */ 304 assertEquals(1L, cache.getBlockCount()); 305 assertTrue(cache.getCurrentSize() > 0L); 306 assertTrue("We should have a block!", cache.iterator().hasNext()); 307 } 308 309 @Test 310 public void testRetrieveFromFile() throws Exception { 311 Path testDir = createAndGetTestDir(); 312 String ioEngineName = "file:" + testDir + "/bucket.cache"; 313 testRetrievalUtils(testDir, ioEngineName); 314 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 315 String persistencePath = testDir + "/bucket.persistence"; 316 BucketCache bucketCache = null; 317 try { 318 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 319 smallBucketSizes, writeThreads, writerQLen, persistencePath); 320 assertTrue(bucketCache.waitForCacheInitialization(10000)); 321 assertFalse(new File(persistencePath).exists()); 322 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 323 assertEquals(0, bucketCache.backingMap.size()); 324 } finally { 325 bucketCache.shutdown(); 326 HBASE_TESTING_UTILITY.cleanupTestDir(); 327 } 328 } 329 330 @Test 331 public void testRetrieveFromMMap() throws Exception { 332 final Path testDir = createAndGetTestDir(); 333 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 334 testRetrievalUtils(testDir, ioEngineName); 335 } 336 337 @Test 338 public void testRetrieveFromPMem() throws Exception { 339 final Path testDir = createAndGetTestDir(); 340 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 341 testRetrievalUtils(testDir, ioEngineName); 342 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 343 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 344 BucketCache bucketCache = null; 345 try { 346 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 347 smallBucketSizes, writeThreads, writerQLen, persistencePath); 348 assertTrue(bucketCache.waitForCacheInitialization(10000)); 349 assertFalse(new File(persistencePath).exists()); 350 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 351 assertEquals(0, bucketCache.backingMap.size()); 352 } finally { 353 bucketCache.shutdown(); 354 HBASE_TESTING_UTILITY.cleanupTestDir(); 355 } 356 } 357 358 private void testRetrievalUtils(Path testDir, String ioEngineName) 359 throws IOException, InterruptedException { 360 final String persistencePath = 361 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 362 BucketCache bucketCache = null; 363 try { 364 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 365 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 366 assertTrue(bucketCache.waitForCacheInitialization(10000)); 367 long usedSize = bucketCache.getAllocator().getUsedSize(); 368 assertEquals(0, usedSize); 369 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 370 for (HFileBlockPair block : blocks) { 371 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 372 } 373 for (HFileBlockPair block : blocks) { 374 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 375 false); 376 } 377 usedSize = bucketCache.getAllocator().getUsedSize(); 378 assertNotEquals(0, usedSize); 379 bucketCache.shutdown(); 380 assertTrue(new File(persistencePath).exists()); 381 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 382 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 383 assertTrue(bucketCache.waitForCacheInitialization(10000)); 384 385 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 386 } finally { 387 if (bucketCache != null) { 388 bucketCache.shutdown(); 389 } 390 } 391 assertTrue(new File(persistencePath).exists()); 392 } 393 394 @Test 395 public void testRetrieveUnsupportedIOE() throws Exception { 396 try { 397 final Path testDir = createAndGetTestDir(); 398 final String ioEngineName = testDir + "/bucket.cache"; 399 testRetrievalUtils(testDir, ioEngineName); 400 Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 401 } catch (IllegalArgumentException e) { 402 Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " 403 + "files:, mmap: or offheap", e.getMessage()); 404 } 405 } 406 407 @Test 408 public void testRetrieveFromMultipleFiles() throws Exception { 409 final Path testDirInitial = createAndGetTestDir(); 410 final Path newTestDir = new HBaseTestingUtil().getDataTestDir(); 411 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 412 String ioEngineName = 413 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 414 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 415 testRetrievalUtils(testDirInitial, ioEngineName); 416 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 417 String persistencePath = testDirInitial + "/bucket.persistence"; 418 BucketCache bucketCache = null; 419 try { 420 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 421 smallBucketSizes, writeThreads, writerQLen, persistencePath); 422 assertTrue(bucketCache.waitForCacheInitialization(10000)); 423 assertFalse(new File(persistencePath).exists()); 424 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 425 assertEquals(0, bucketCache.backingMap.size()); 426 } finally { 427 bucketCache.shutdown(); 428 HBASE_TESTING_UTILITY.cleanupTestDir(); 429 } 430 } 431 432 @Test 433 public void testRetrieveFromFileWithoutPersistence() throws Exception { 434 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 435 constructedBlockSizes, writeThreads, writerQLen, null); 436 assertTrue(bucketCache.waitForCacheInitialization(10000)); 437 try { 438 final Path testDir = createAndGetTestDir(); 439 String ioEngineName = "file:" + testDir + "/bucket.cache"; 440 long usedSize = bucketCache.getAllocator().getUsedSize(); 441 assertEquals(0, usedSize); 442 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 443 for (HFileBlockPair block : blocks) { 444 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 445 } 446 for (HFileBlockPair block : blocks) { 447 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 448 false); 449 } 450 usedSize = bucketCache.getAllocator().getUsedSize(); 451 assertNotEquals(0, usedSize); 452 bucketCache.shutdown(); 453 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 454 constructedBlockSizes, writeThreads, writerQLen, null); 455 assertTrue(bucketCache.waitForCacheInitialization(10000)); 456 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 457 } finally { 458 bucketCache.shutdown(); 459 HBASE_TESTING_UTILITY.cleanupTestDir(); 460 } 461 } 462 463 @Test 464 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 465 long availableSpace = 20 * 1024L * 1024 * 1024; 466 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 467 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 468 assertTrue(allocator.getBuckets().length > 0); 469 } 470 471 @Test 472 public void testGetPartitionSize() throws IOException { 473 // Test default values 474 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 475 BucketCache.DEFAULT_MIN_FACTOR); 476 477 Configuration conf = HBaseConfiguration.create(); 478 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 479 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 480 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 481 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 482 483 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 484 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 485 assertTrue(cache.waitForCacheInitialization(10000)); 486 487 validateGetPartitionSize(cache, 0.1f, 0.5f); 488 validateGetPartitionSize(cache, 0.7f, 0.5f); 489 validateGetPartitionSize(cache, 0.2f, 0.5f); 490 } 491 492 @Test 493 public void testCacheSizeCapacity() throws IOException { 494 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 495 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 496 BucketCache.DEFAULT_MIN_FACTOR); 497 Configuration conf = HBaseConfiguration.create(); 498 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 499 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 500 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 501 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 502 try { 503 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 504 writerQLen, null, 100, conf); 505 Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 506 } catch (IllegalArgumentException e) { 507 Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 508 } 509 } 510 511 @Test 512 public void testValidBucketCacheConfigs() throws IOException { 513 Configuration conf = HBaseConfiguration.create(); 514 conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 515 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 516 conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 517 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 518 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 519 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 520 521 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 522 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 523 assertTrue(cache.waitForCacheInitialization(10000)); 524 525 assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 526 cache.getAcceptableFactor(), 0); 527 assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0); 528 assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 529 cache.getExtraFreeFactor(), 0); 530 assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, 531 cache.getSingleFactor(), 0); 532 assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, 533 cache.getMultiFactor(), 0); 534 assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, 535 cache.getMemoryFactor(), 0); 536 } 537 538 @Test 539 public void testInvalidAcceptFactorConfig() throws IOException { 540 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 541 boolean[] expectedOutcomes = { false, false, true, false }; 542 Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues); 543 Configuration conf = HBaseConfiguration.create(); 544 checkConfigValues(conf, configMappings, expectedOutcomes); 545 } 546 547 @Test 548 public void testInvalidMinFactorConfig() throws IOException { 549 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 550 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 551 boolean[] expectedOutcomes = { false, true, false, false }; 552 Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues); 553 Configuration conf = HBaseConfiguration.create(); 554 checkConfigValues(conf, configMappings, expectedOutcomes); 555 } 556 557 @Test 558 public void testInvalidExtraFreeFactorConfig() throws IOException { 559 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 560 // throws due to <0, in expected range, in expected range, config can be > 1.0 561 boolean[] expectedOutcomes = { false, true, true, true }; 562 Map<String, float[]> configMappings = 563 ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 564 Configuration conf = HBaseConfiguration.create(); 565 checkConfigValues(conf, configMappings, expectedOutcomes); 566 } 567 568 @Test 569 public void testInvalidCacheSplitFactorConfig() throws IOException { 570 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 571 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 572 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 573 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 574 // be negative, configs don't add to 1.0 575 boolean[] expectedOutcomes = { true, false, false, false }; 576 Map<String, 577 float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 578 singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, 579 BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues); 580 Configuration conf = HBaseConfiguration.create(); 581 checkConfigValues(conf, configMappings, expectedOutcomes); 582 } 583 584 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 585 boolean[] expectSuccess) throws IOException { 586 Set<String> configNames = configMap.keySet(); 587 for (int i = 0; i < expectSuccess.length; i++) { 588 try { 589 for (String configName : configNames) { 590 conf.setFloat(configName, configMap.get(configName)[i]); 591 } 592 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 593 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 594 assertTrue(cache.waitForCacheInitialization(10000)); 595 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 596 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 597 } catch (IllegalArgumentException e) { 598 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 599 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 600 } 601 } 602 } 603 604 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 605 float minFactor) { 606 long expectedOutput = 607 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 608 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 609 } 610 611 @Test 612 public void testOffsetProducesPositiveOutput() { 613 // This number is picked because it produces negative output if the values isn't ensured to be 614 // positive. See HBASE-18757 for more information. 615 long testValue = 549888460800L; 616 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 617 return ByteBuffAllocator.NONE; 618 }, ByteBuffAllocator.HEAP); 619 assertEquals(testValue, bucketEntry.offset()); 620 } 621 622 @Test 623 public void testEvictionCount() throws InterruptedException { 624 int size = 100; 625 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 626 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 627 HFileContext meta = new HFileContextBuilder().build(); 628 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 629 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 630 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 631 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 632 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 633 634 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 635 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 636 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 637 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 638 blockWithNextBlockMetadata.serialize(block1Buffer, true); 639 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 640 641 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 642 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 643 block1Buffer); 644 645 waitUntilFlushedToBucket(cache, key); 646 647 assertEquals(0, cache.getStats().getEvictionCount()); 648 649 // evict call should return 1, but then eviction count be 0 650 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 651 assertEquals(0, cache.getStats().getEvictionCount()); 652 653 // add back 654 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 655 block1Buffer); 656 waitUntilFlushedToBucket(cache, key); 657 658 // should not increment 659 assertTrue(cache.evictBlock(key)); 660 assertEquals(0, cache.getStats().getEvictionCount()); 661 662 // add back 663 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 664 block1Buffer); 665 waitUntilFlushedToBucket(cache, key); 666 667 // should finally increment eviction count 668 cache.freeSpace("testing"); 669 assertEquals(1, cache.getStats().getEvictionCount()); 670 } 671 672 @Test 673 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 674 int size = 100; 675 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 676 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 677 HFileContext meta = new HFileContextBuilder().build(); 678 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 679 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 680 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 681 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 682 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 683 684 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 685 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 686 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 687 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 688 blockWithNextBlockMetadata.serialize(block1Buffer, true); 689 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 690 691 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 692 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 693 block1Buffer); 694 695 waitUntilFlushedToBucket(cache, key); 696 assertNotNull(cache.backingMap.get(key)); 697 assertEquals(1, cache.backingMap.get(key).refCnt()); 698 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 699 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 700 701 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 702 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 703 block1Buffer); 704 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 705 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 706 assertEquals(1, cache.backingMap.get(key).refCnt()); 707 708 // Clear and add blockWithoutNextBlockMetadata 709 assertTrue(cache.evictBlock(key)); 710 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 711 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 712 713 assertNull(cache.getBlock(key, false, false, false)); 714 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 715 block2Buffer); 716 717 waitUntilFlushedToBucket(cache, key); 718 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 719 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 720 721 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 722 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 723 block1Buffer); 724 725 waitUntilFlushedToBucket(cache, key); 726 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 727 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 728 } 729 730 @Test 731 public void testRAMCache() { 732 int size = 100; 733 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 734 byte[] byteArr = new byte[length]; 735 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 736 HFileContext meta = new HFileContextBuilder().build(); 737 738 RAMCache cache = new RAMCache(); 739 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 740 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 741 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 742 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 743 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 744 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 745 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false); 746 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false); 747 748 assertFalse(cache.containsKey(key1)); 749 assertNull(cache.putIfAbsent(key1, re1)); 750 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 751 752 assertNotNull(cache.putIfAbsent(key1, re2)); 753 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 754 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 755 756 assertNull(cache.putIfAbsent(key2, re2)); 757 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 758 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 759 760 cache.remove(key1); 761 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 762 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 763 764 cache.clear(); 765 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 766 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 767 } 768 769 @Test 770 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 771 // initialize an block. 772 int size = 100, offset = 20; 773 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 774 ByteBuffer buf = ByteBuffer.allocate(length); 775 HFileContext meta = new HFileContextBuilder().build(); 776 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 777 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 778 779 // initialize an mocked ioengine. 780 IOEngine ioEngine = Mockito.mock(IOEngine.class); 781 when(ioEngine.usesSharedMemory()).thenReturn(false); 782 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 783 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 784 Mockito.anyLong()); 785 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 786 Mockito.anyLong()); 787 788 // create an bucket allocator. 789 long availableSpace = 1024 * 1024 * 1024L; 790 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 791 792 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 793 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false); 794 795 Assert.assertEquals(0, allocator.getUsedSize()); 796 try { 797 re.writeToCache(ioEngine, allocator, null, null, 798 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 799 Assert.fail(); 800 } catch (Exception e) { 801 } 802 Assert.assertEquals(0, allocator.getUsedSize()); 803 } 804 805 /** 806 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 807 * could not be freed even if corresponding {@link HFileBlock} is evicted from 808 * {@link BucketCache}. 809 */ 810 @Test 811 public void testFreeBucketEntryRestoredFromFile() throws Exception { 812 BucketCache bucketCache = null; 813 try { 814 final Path dataTestDir = createAndGetTestDir(); 815 816 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 817 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 818 819 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 820 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 821 assertTrue(bucketCache.waitForCacheInitialization(10000)); 822 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 823 assertEquals(0, usedByteSize); 824 825 HFileBlockPair[] hfileBlockPairs = 826 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 827 // Add blocks 828 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 829 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 830 } 831 832 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 833 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 834 hfileBlockPair.getBlock(), false); 835 } 836 usedByteSize = bucketCache.getAllocator().getUsedSize(); 837 assertNotEquals(0, usedByteSize); 838 // persist cache to file 839 bucketCache.shutdown(); 840 assertTrue(new File(persistencePath).exists()); 841 842 // restore cache from file 843 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 844 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 845 assertTrue(bucketCache.waitForCacheInitialization(10000)); 846 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 847 848 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 849 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 850 bucketCache.evictBlock(blockCacheKey); 851 } 852 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 853 assertEquals(0, bucketCache.backingMap.size()); 854 } finally { 855 bucketCache.shutdown(); 856 HBASE_TESTING_UTILITY.cleanupTestDir(); 857 } 858 } 859 860 @Test 861 public void testBlockAdditionWaitWhenCache() throws Exception { 862 BucketCache bucketCache = null; 863 try { 864 final Path dataTestDir = createAndGetTestDir(); 865 866 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 867 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 868 869 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 870 config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000); 871 872 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 873 constructedBlockSizes, 1, 1, persistencePath); 874 assertTrue(bucketCache.waitForCacheInitialization(10000)); 875 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 876 assertEquals(0, usedByteSize); 877 878 HFileBlockPair[] hfileBlockPairs = 879 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 880 // Add blocks 881 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 882 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 883 true); 884 } 885 886 // Max wait for 10 seconds. 887 long timeout = 10000; 888 // Wait for blocks size to match the number of blocks. 889 while (bucketCache.backingMap.size() != 10) { 890 if (timeout <= 0) break; 891 Threads.sleep(100); 892 timeout -= 100; 893 } 894 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 895 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 896 } 897 usedByteSize = bucketCache.getAllocator().getUsedSize(); 898 assertNotEquals(0, usedByteSize); 899 // persist cache to file 900 bucketCache.shutdown(); 901 assertTrue(new File(persistencePath).exists()); 902 903 // restore cache from file 904 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 905 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 906 assertTrue(bucketCache.waitForCacheInitialization(10000)); 907 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 908 909 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 910 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 911 bucketCache.evictBlock(blockCacheKey); 912 } 913 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 914 assertEquals(0, bucketCache.backingMap.size()); 915 } finally { 916 if (bucketCache != null) { 917 bucketCache.shutdown(); 918 } 919 HBASE_TESTING_UTILITY.cleanupTestDir(); 920 } 921 } 922 923 @Test 924 public void testNotifyFileCachingCompletedSuccess() throws Exception { 925 BucketCache bucketCache = null; 926 try { 927 Path filePath = 928 new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess"); 929 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10); 930 if (bucketCache.getStats().getFailedInserts() > 0) { 931 LOG.info("There were {} fail inserts, " 932 + "will assert if total blocks in backingMap equals (10 - failInserts) " 933 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 934 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 935 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 936 } else { 937 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 938 } 939 } finally { 940 if (bucketCache != null) { 941 bucketCache.shutdown(); 942 } 943 HBASE_TESTING_UTILITY.cleanupTestDir(); 944 } 945 } 946 947 @Test 948 public void testNotifyFileCachingCompletedNotAllCached() throws Exception { 949 BucketCache bucketCache = null; 950 try { 951 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 952 "testNotifyFileCachingCompletedNotAllCached"); 953 // Deliberately passing more blocks than we have created to test that 954 // notifyFileCachingCompleted will not consider the file fully cached 955 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12); 956 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 957 } finally { 958 if (bucketCache != null) { 959 bucketCache.shutdown(); 960 } 961 HBASE_TESTING_UTILITY.cleanupTestDir(); 962 } 963 } 964 965 private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, 966 int totalBlocksToCheck) throws Exception { 967 final Path dataTestDir = createAndGetTestDir(); 968 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 969 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 970 constructedBlockSizes, 1, 1, null); 971 assertTrue(bucketCache.waitForCacheInitialization(10000)); 972 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 973 assertEquals(0, usedByteSize); 974 HFileBlockPair[] hfileBlockPairs = 975 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath); 976 // Add blocks 977 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 978 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true); 979 } 980 bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck, 981 totalBlocksToCheck * constructedBlockSize); 982 return bucketCache; 983 } 984 985 @Test 986 public void testEvictOrphansOutOfGracePeriod() throws Exception { 987 BucketCache bucketCache = testEvictOrphans(0); 988 assertEquals(10, bucketCache.getBackingMap().size()); 989 assertEquals(0, bucketCache.blocksByHFile.stream() 990 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count()); 991 } 992 993 @Test 994 public void testEvictOrphansWithinGracePeriod() throws Exception { 995 BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L); 996 assertEquals(18, bucketCache.getBackingMap().size()); 997 assertTrue(bucketCache.blocksByHFile.stream() 998 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0); 999 } 1000 1001 private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception { 1002 Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid"); 1003 Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan"); 1004 Map<String, HRegion> onlineRegions = new HashMap<>(); 1005 List<HStore> stores = new ArrayList<>(); 1006 Collection<HStoreFile> storeFiles = new ArrayList<>(); 1007 HRegion mockedRegion = mock(HRegion.class); 1008 HStore mockedStore = mock(HStore.class); 1009 HStoreFile mockedStoreFile = mock(HStoreFile.class); 1010 when(mockedStoreFile.getPath()).thenReturn(validFile); 1011 storeFiles.add(mockedStoreFile); 1012 when(mockedStore.getStorefiles()).thenReturn(storeFiles); 1013 stores.add(mockedStore); 1014 when(mockedRegion.getStores()).thenReturn(stores); 1015 onlineRegions.put("mocked_region", mockedRegion); 1016 HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 1017 HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 1018 HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 1019 HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD, 1020 orphanEvictionGracePeriod); 1021 BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21, 1022 constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000, 1023 HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions); 1024 HFileBlockPair[] validBlockPairs = 1025 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile); 1026 HFileBlockPair[] orphanBlockPairs = 1027 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile); 1028 for (HFileBlockPair pair : validBlockPairs) { 1029 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1030 } 1031 waitUntilAllFlushedToBucket(bucketCache); 1032 assertEquals(10, bucketCache.getBackingMap().size()); 1033 bucketCache.freeSpace("test"); 1034 assertEquals(10, bucketCache.getBackingMap().size()); 1035 for (HFileBlockPair pair : orphanBlockPairs) { 1036 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1037 } 1038 waitUntilAllFlushedToBucket(bucketCache); 1039 assertEquals(20, bucketCache.getBackingMap().size()); 1040 bucketCache.freeSpace("test"); 1041 return bucketCache; 1042 } 1043}