001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR; 026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR; 027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME; 029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME; 031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME; 032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME; 033import static org.junit.Assert.assertEquals; 034import static org.junit.Assert.assertFalse; 035import static org.junit.Assert.assertNotEquals; 036import static org.junit.Assert.assertNotNull; 037import static org.junit.Assert.assertNull; 038import static org.junit.Assert.assertTrue; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.when; 041 042import java.io.File; 043import java.io.IOException; 044import java.nio.ByteBuffer; 045import java.util.ArrayList; 046import java.util.Arrays; 047import java.util.Collection; 048import java.util.HashMap; 049import java.util.List; 050import java.util.Map; 051import java.util.Set; 052import java.util.concurrent.ThreadLocalRandom; 053import java.util.concurrent.locks.ReentrantReadWriteLock; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.hbase.HBaseClassTestRule; 057import org.apache.hadoop.hbase.HBaseConfiguration; 058import org.apache.hadoop.hbase.HBaseTestingUtil; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.Waiter; 061import org.apache.hadoop.hbase.io.ByteBuffAllocator; 062import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 063import org.apache.hadoop.hbase.io.hfile.BlockType; 064import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 065import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 066import org.apache.hadoop.hbase.io.hfile.Cacheable; 067import org.apache.hadoop.hbase.io.hfile.HFileBlock; 068import org.apache.hadoop.hbase.io.hfile.HFileContext; 069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 070import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 071import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 072import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 073import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 074import org.apache.hadoop.hbase.nio.ByteBuff; 075import org.apache.hadoop.hbase.regionserver.HRegion; 076import org.apache.hadoop.hbase.regionserver.HStore; 077import org.apache.hadoop.hbase.regionserver.HStoreFile; 078import org.apache.hadoop.hbase.testclassification.IOTests; 079import org.apache.hadoop.hbase.testclassification.LargeTests; 080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.util.Threads; 083import org.junit.After; 084import org.junit.Assert; 085import org.junit.Before; 086import org.junit.ClassRule; 087import org.junit.Test; 088import org.junit.experimental.categories.Category; 089import org.junit.runner.RunWith; 090import org.junit.runners.Parameterized; 091import org.mockito.Mockito; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 096 097/** 098 * Basic test of BucketCache.Puts and gets. 099 * <p> 100 * Tests will ensure that blocks' data correctness under several threads concurrency 101 */ 102@RunWith(Parameterized.class) 103@Category({ IOTests.class, LargeTests.class }) 104public class TestBucketCache { 105 106 private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class); 107 108 @ClassRule 109 public static final HBaseClassTestRule CLASS_RULE = 110 HBaseClassTestRule.forClass(TestBucketCache.class); 111 112 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 113 public static Iterable<Object[]> data() { 114 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 115 // for these tests? 116 { 16 * 1024, 117 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 118 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 119 128 * 1024 + 1024 } } }); 120 } 121 122 @Parameterized.Parameter(0) 123 public int constructedBlockSize; 124 125 @Parameterized.Parameter(1) 126 public int[] constructedBlockSizes; 127 128 BucketCache cache; 129 final int CACHE_SIZE = 1000000; 130 final int NUM_BLOCKS = 100; 131 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 132 final int NUM_THREADS = 100; 133 final int NUM_QUERIES = 10000; 134 135 final long capacitySize = 32 * 1024 * 1024; 136 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 137 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 138 private String ioEngineName = "offheap"; 139 140 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 141 142 private static class MockedBucketCache extends BucketCache { 143 144 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 145 int writerThreads, int writerQLen, String persistencePath) throws IOException { 146 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 147 persistencePath); 148 } 149 150 @Override 151 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 152 super.cacheBlock(cacheKey, buf, inMemory); 153 } 154 155 @Override 156 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 157 super.cacheBlock(cacheKey, buf); 158 } 159 } 160 161 @Before 162 public void setup() throws IOException { 163 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 164 constructedBlockSizes, writeThreads, writerQLen, null); 165 } 166 167 @After 168 public void tearDown() { 169 cache.shutdown(); 170 } 171 172 /** 173 * Test Utility to create test dir and return name 174 * @return return name of created dir 175 * @throws IOException throws IOException 176 */ 177 private Path createAndGetTestDir() throws IOException { 178 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 179 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 180 return testDir; 181 } 182 183 /** 184 * Return a random element from {@code a}. 185 */ 186 private static <T> T randFrom(List<T> a) { 187 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 188 } 189 190 @Test 191 public void testBucketAllocator() throws BucketAllocatorException { 192 BucketAllocator mAllocator = cache.getAllocator(); 193 /* 194 * Test the allocator first 195 */ 196 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 197 198 boolean full = false; 199 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 200 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 201 // the cache is completely filled. 202 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 203 while (!full) { 204 Integer blockSize = null; 205 try { 206 blockSize = randFrom(tmp); 207 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 208 } catch (CacheFullException cfe) { 209 tmp.remove(blockSize); 210 if (tmp.isEmpty()) full = true; 211 } 212 } 213 214 for (Integer blockSize : BLOCKSIZES) { 215 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 216 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 217 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 218 219 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 220 // additional 1024 on top of that so this counts towards fragmentation in our test 221 // real life may have worse fragmentation because blocks may not be perfectly sized to block 222 // size, given encoding/compression and large rows 223 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 224 } 225 226 mAllocator.logDebugStatistics(); 227 228 for (Pair<Long, Integer> allocation : allocations) { 229 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 230 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 231 } 232 assertEquals(0, mAllocator.getUsedSize()); 233 } 234 235 @Test 236 public void testCacheSimple() throws Exception { 237 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 238 } 239 240 @Test 241 public void testCacheMultiThreadedSingleKey() throws Exception { 242 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 243 } 244 245 @Test 246 public void testHeapSizeChanges() throws Exception { 247 cache.stopWriterThreads(); 248 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 249 } 250 251 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 252 throws InterruptedException { 253 Waiter.waitFor(HBaseConfiguration.create(), 10000, 254 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 255 } 256 257 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 258 while (!cache.ramCache.isEmpty()) { 259 Thread.sleep(100); 260 } 261 Thread.sleep(1000); 262 } 263 264 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 265 // threads will flush it to the bucket and put reference entry in backingMap. 266 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 267 Cacheable block, boolean waitWhenCache) throws InterruptedException { 268 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 269 waitUntilFlushedToBucket(cache, cacheKey); 270 } 271 272 @Test 273 public void testMemoryLeak() throws Exception { 274 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 275 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 276 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 277 long lockId = cache.backingMap.get(cacheKey).offset(); 278 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 279 lock.writeLock().lock(); 280 Thread evictThread = new Thread("evict-block") { 281 @Override 282 public void run() { 283 cache.evictBlock(cacheKey); 284 } 285 }; 286 evictThread.start(); 287 cache.offsetLock.waitForWaiters(lockId, 1); 288 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 289 assertEquals(0, cache.getBlockCount()); 290 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 291 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 292 assertEquals(1, cache.getBlockCount()); 293 lock.writeLock().unlock(); 294 evictThread.join(); 295 /** 296 * <pre> 297 * The asserts here before HBASE-21957 are: 298 * assertEquals(1L, cache.getBlockCount()); 299 * assertTrue(cache.getCurrentSize() > 0L); 300 * assertTrue("We should have a block!", cache.iterator().hasNext()); 301 * 302 * The asserts here after HBASE-21957 are: 303 * assertEquals(0, cache.getBlockCount()); 304 * assertEquals(cache.getCurrentSize(), 0L); 305 * 306 * I think the asserts before HBASE-21957 is more reasonable,because 307 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 308 * it had seen, and newly added Block after the {@link BucketEntry} 309 * it had seen should not be evicted. 310 * </pre> 311 */ 312 assertEquals(1L, cache.getBlockCount()); 313 assertTrue(cache.getCurrentSize() > 0L); 314 assertTrue("We should have a block!", cache.iterator().hasNext()); 315 } 316 317 @Test 318 public void testRetrieveFromFile() throws Exception { 319 Path testDir = createAndGetTestDir(); 320 String ioEngineName = "file:" + testDir + "/bucket.cache"; 321 testRetrievalUtils(testDir, ioEngineName); 322 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 323 String persistencePath = testDir + "/bucket.persistence"; 324 BucketCache bucketCache = null; 325 try { 326 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 327 smallBucketSizes, writeThreads, writerQLen, persistencePath); 328 assertTrue(bucketCache.waitForCacheInitialization(10000)); 329 assertFalse(new File(persistencePath).exists()); 330 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 331 assertEquals(0, bucketCache.backingMap.size()); 332 } finally { 333 bucketCache.shutdown(); 334 HBASE_TESTING_UTILITY.cleanupTestDir(); 335 } 336 } 337 338 @Test 339 public void testRetrieveFromMMap() throws Exception { 340 final Path testDir = createAndGetTestDir(); 341 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 342 testRetrievalUtils(testDir, ioEngineName); 343 } 344 345 @Test 346 public void testRetrieveFromPMem() throws Exception { 347 final Path testDir = createAndGetTestDir(); 348 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 349 testRetrievalUtils(testDir, ioEngineName); 350 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 351 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 352 BucketCache bucketCache = null; 353 try { 354 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 355 smallBucketSizes, writeThreads, writerQLen, persistencePath); 356 assertTrue(bucketCache.waitForCacheInitialization(10000)); 357 assertFalse(new File(persistencePath).exists()); 358 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 359 assertEquals(0, bucketCache.backingMap.size()); 360 } finally { 361 bucketCache.shutdown(); 362 HBASE_TESTING_UTILITY.cleanupTestDir(); 363 } 364 } 365 366 private void testRetrievalUtils(Path testDir, String ioEngineName) 367 throws IOException, InterruptedException { 368 final String persistencePath = 369 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 370 BucketCache bucketCache = null; 371 try { 372 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 373 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 374 assertTrue(bucketCache.waitForCacheInitialization(10000)); 375 long usedSize = bucketCache.getAllocator().getUsedSize(); 376 assertEquals(0, usedSize); 377 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 378 for (HFileBlockPair block : blocks) { 379 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 380 } 381 for (HFileBlockPair block : blocks) { 382 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 383 false); 384 } 385 usedSize = bucketCache.getAllocator().getUsedSize(); 386 assertNotEquals(0, usedSize); 387 bucketCache.shutdown(); 388 assertTrue(new File(persistencePath).exists()); 389 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 390 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 391 assertTrue(bucketCache.waitForCacheInitialization(10000)); 392 393 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 394 } finally { 395 if (bucketCache != null) { 396 bucketCache.shutdown(); 397 } 398 } 399 assertTrue(new File(persistencePath).exists()); 400 } 401 402 @Test 403 public void testRetrieveUnsupportedIOE() throws Exception { 404 try { 405 final Path testDir = createAndGetTestDir(); 406 final String ioEngineName = testDir + "/bucket.cache"; 407 testRetrievalUtils(testDir, ioEngineName); 408 Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 409 } catch (IllegalArgumentException e) { 410 Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " 411 + "files:, mmap: or offheap", e.getMessage()); 412 } 413 } 414 415 @Test 416 public void testRetrieveFromMultipleFiles() throws Exception { 417 final Path testDirInitial = createAndGetTestDir(); 418 final Path newTestDir = new HBaseTestingUtil().getDataTestDir(); 419 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 420 String ioEngineName = 421 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 422 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 423 testRetrievalUtils(testDirInitial, ioEngineName); 424 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 425 String persistencePath = testDirInitial + "/bucket.persistence"; 426 BucketCache bucketCache = null; 427 try { 428 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 429 smallBucketSizes, writeThreads, writerQLen, persistencePath); 430 assertTrue(bucketCache.waitForCacheInitialization(10000)); 431 assertFalse(new File(persistencePath).exists()); 432 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 433 assertEquals(0, bucketCache.backingMap.size()); 434 } finally { 435 bucketCache.shutdown(); 436 HBASE_TESTING_UTILITY.cleanupTestDir(); 437 } 438 } 439 440 @Test 441 public void testRetrieveFromFileWithoutPersistence() throws Exception { 442 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 443 constructedBlockSizes, writeThreads, writerQLen, null); 444 assertTrue(bucketCache.waitForCacheInitialization(10000)); 445 try { 446 final Path testDir = createAndGetTestDir(); 447 String ioEngineName = "file:" + testDir + "/bucket.cache"; 448 long usedSize = bucketCache.getAllocator().getUsedSize(); 449 assertEquals(0, usedSize); 450 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 451 for (HFileBlockPair block : blocks) { 452 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 453 } 454 for (HFileBlockPair block : blocks) { 455 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 456 false); 457 } 458 usedSize = bucketCache.getAllocator().getUsedSize(); 459 assertNotEquals(0, usedSize); 460 bucketCache.shutdown(); 461 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 462 constructedBlockSizes, writeThreads, writerQLen, null); 463 assertTrue(bucketCache.waitForCacheInitialization(10000)); 464 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 465 } finally { 466 bucketCache.shutdown(); 467 HBASE_TESTING_UTILITY.cleanupTestDir(); 468 } 469 } 470 471 @Test 472 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 473 long availableSpace = 20 * 1024L * 1024 * 1024; 474 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 475 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 476 assertTrue(allocator.getBuckets().length > 0); 477 } 478 479 @Test 480 public void testGetPartitionSize() throws IOException { 481 // Test default values 482 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 483 484 Configuration conf = HBaseConfiguration.create(); 485 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 486 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 487 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 488 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 489 490 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 491 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 492 assertTrue(cache.waitForCacheInitialization(10000)); 493 494 validateGetPartitionSize(cache, 0.1f, 0.5f); 495 validateGetPartitionSize(cache, 0.7f, 0.5f); 496 validateGetPartitionSize(cache, 0.2f, 0.5f); 497 } 498 499 @Test 500 public void testCacheSizeCapacity() throws IOException { 501 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 502 validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR); 503 Configuration conf = HBaseConfiguration.create(); 504 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 505 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 506 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 507 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 508 try { 509 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 510 writerQLen, null, 100, conf); 511 Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 512 } catch (IllegalArgumentException e) { 513 Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 514 } 515 } 516 517 @Test 518 public void testValidBucketCacheConfigs() throws IOException { 519 Configuration conf = HBaseConfiguration.create(); 520 conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 521 conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f); 522 conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 523 conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f); 524 conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f); 525 conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 526 527 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 528 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 529 assertTrue(cache.waitForCacheInitialization(10000)); 530 531 assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 532 cache.getAcceptableFactor(), 0); 533 assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0); 534 assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 535 cache.getExtraFreeFactor(), 0); 536 assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(), 537 0); 538 assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(), 539 0); 540 assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(), 541 0); 542 } 543 544 @Test 545 public void testInvalidAcceptFactorConfig() throws IOException { 546 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 547 boolean[] expectedOutcomes = { false, false, true, false }; 548 Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues); 549 Configuration conf = HBaseConfiguration.create(); 550 checkConfigValues(conf, configMappings, expectedOutcomes); 551 } 552 553 @Test 554 public void testInvalidMinFactorConfig() throws IOException { 555 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 556 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 557 boolean[] expectedOutcomes = { false, true, false, false }; 558 Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues); 559 Configuration conf = HBaseConfiguration.create(); 560 checkConfigValues(conf, configMappings, expectedOutcomes); 561 } 562 563 @Test 564 public void testInvalidExtraFreeFactorConfig() throws IOException { 565 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 566 // throws due to <0, in expected range, in expected range, config can be > 1.0 567 boolean[] expectedOutcomes = { false, true, true, true }; 568 Map<String, float[]> configMappings = 569 ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 570 Configuration conf = HBaseConfiguration.create(); 571 checkConfigValues(conf, configMappings, expectedOutcomes); 572 } 573 574 @Test 575 public void testInvalidCacheSplitFactorConfig() throws IOException { 576 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 577 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 578 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 579 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 580 // be negative, configs don't add to 1.0 581 boolean[] expectedOutcomes = { true, false, false, false }; 582 Map<String, 583 float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues, 584 MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME, 585 memoryFactorConfigValues); 586 Configuration conf = HBaseConfiguration.create(); 587 checkConfigValues(conf, configMappings, expectedOutcomes); 588 } 589 590 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 591 boolean[] expectSuccess) throws IOException { 592 Set<String> configNames = configMap.keySet(); 593 for (int i = 0; i < expectSuccess.length; i++) { 594 try { 595 for (String configName : configNames) { 596 conf.setFloat(configName, configMap.get(configName)[i]); 597 } 598 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 599 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 600 assertTrue(cache.waitForCacheInitialization(10000)); 601 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 602 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 603 } catch (IllegalArgumentException e) { 604 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 605 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 606 } 607 } 608 } 609 610 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 611 float minFactor) { 612 long expectedOutput = 613 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 614 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 615 } 616 617 @Test 618 public void testOffsetProducesPositiveOutput() { 619 // This number is picked because it produces negative output if the values isn't ensured to be 620 // positive. See HBASE-18757 for more information. 621 long testValue = 549888460800L; 622 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 623 return ByteBuffAllocator.NONE; 624 }, ByteBuffAllocator.HEAP); 625 assertEquals(testValue, bucketEntry.offset()); 626 } 627 628 @Test 629 public void testEvictionCount() throws InterruptedException { 630 int size = 100; 631 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 632 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 633 HFileContext meta = new HFileContextBuilder().build(); 634 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 635 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 636 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 637 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 638 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 639 640 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 641 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 642 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 643 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 644 blockWithNextBlockMetadata.serialize(block1Buffer, true); 645 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 646 647 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 648 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 649 block1Buffer); 650 651 waitUntilFlushedToBucket(cache, key); 652 653 assertEquals(0, cache.getStats().getEvictionCount()); 654 655 // evict call should return 1, but then eviction count be 0 656 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 657 assertEquals(0, cache.getStats().getEvictionCount()); 658 659 // add back 660 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 661 block1Buffer); 662 waitUntilFlushedToBucket(cache, key); 663 664 // should not increment 665 assertTrue(cache.evictBlock(key)); 666 assertEquals(0, cache.getStats().getEvictionCount()); 667 668 // add back 669 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 670 block1Buffer); 671 waitUntilFlushedToBucket(cache, key); 672 673 // should finally increment eviction count 674 cache.freeSpace("testing"); 675 assertEquals(1, cache.getStats().getEvictionCount()); 676 } 677 678 @Test 679 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 680 int size = 100; 681 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 682 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 683 HFileContext meta = new HFileContextBuilder().build(); 684 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 685 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 686 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 687 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 688 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 689 690 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 691 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 692 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 693 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 694 blockWithNextBlockMetadata.serialize(block1Buffer, true); 695 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 696 697 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 698 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 699 block1Buffer); 700 701 waitUntilFlushedToBucket(cache, key); 702 assertNotNull(cache.backingMap.get(key)); 703 assertEquals(1, cache.backingMap.get(key).refCnt()); 704 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 705 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 706 707 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 708 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 709 block1Buffer); 710 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 711 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 712 assertEquals(1, cache.backingMap.get(key).refCnt()); 713 714 // Clear and add blockWithoutNextBlockMetadata 715 assertTrue(cache.evictBlock(key)); 716 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 717 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 718 719 assertNull(cache.getBlock(key, false, false, false)); 720 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 721 block2Buffer); 722 723 waitUntilFlushedToBucket(cache, key); 724 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 725 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 726 727 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 728 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 729 block1Buffer); 730 731 waitUntilFlushedToBucket(cache, key); 732 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 733 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 734 } 735 736 @Test 737 public void testRAMCache() { 738 int size = 100; 739 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 740 byte[] byteArr = new byte[length]; 741 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 742 HFileContext meta = new HFileContextBuilder().build(); 743 744 RAMCache cache = new RAMCache(); 745 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 746 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 747 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 748 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 749 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 750 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 751 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false); 752 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false); 753 754 assertFalse(cache.containsKey(key1)); 755 assertNull(cache.putIfAbsent(key1, re1)); 756 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 757 758 assertNotNull(cache.putIfAbsent(key1, re2)); 759 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 760 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 761 762 assertNull(cache.putIfAbsent(key2, re2)); 763 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 764 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 765 766 cache.remove(key1); 767 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 768 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 769 770 cache.clear(); 771 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 772 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 773 } 774 775 @Test 776 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 777 // initialize an block. 778 int size = 100, offset = 20; 779 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 780 ByteBuffer buf = ByteBuffer.allocate(length); 781 HFileContext meta = new HFileContextBuilder().build(); 782 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 783 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 784 785 // initialize an mocked ioengine. 786 IOEngine ioEngine = Mockito.mock(IOEngine.class); 787 when(ioEngine.usesSharedMemory()).thenReturn(false); 788 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 789 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 790 Mockito.anyLong()); 791 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 792 Mockito.anyLong()); 793 794 // create an bucket allocator. 795 long availableSpace = 1024 * 1024 * 1024L; 796 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 797 798 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 799 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false); 800 801 Assert.assertEquals(0, allocator.getUsedSize()); 802 try { 803 re.writeToCache(ioEngine, allocator, null, null, 804 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 805 Assert.fail(); 806 } catch (Exception e) { 807 } 808 Assert.assertEquals(0, allocator.getUsedSize()); 809 } 810 811 /** 812 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 813 * could not be freed even if corresponding {@link HFileBlock} is evicted from 814 * {@link BucketCache}. 815 */ 816 @Test 817 public void testFreeBucketEntryRestoredFromFile() throws Exception { 818 BucketCache bucketCache = null; 819 try { 820 final Path dataTestDir = createAndGetTestDir(); 821 822 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 823 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 824 825 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 826 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 827 assertTrue(bucketCache.waitForCacheInitialization(10000)); 828 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 829 assertEquals(0, usedByteSize); 830 831 HFileBlockPair[] hfileBlockPairs = 832 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 833 // Add blocks 834 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 835 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 836 } 837 838 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 839 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 840 hfileBlockPair.getBlock(), false); 841 } 842 usedByteSize = bucketCache.getAllocator().getUsedSize(); 843 assertNotEquals(0, usedByteSize); 844 // persist cache to file 845 bucketCache.shutdown(); 846 assertTrue(new File(persistencePath).exists()); 847 848 // restore cache from file 849 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 850 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 851 assertTrue(bucketCache.waitForCacheInitialization(10000)); 852 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 853 854 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 855 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 856 bucketCache.evictBlock(blockCacheKey); 857 } 858 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 859 assertEquals(0, bucketCache.backingMap.size()); 860 } finally { 861 bucketCache.shutdown(); 862 HBASE_TESTING_UTILITY.cleanupTestDir(); 863 } 864 } 865 866 @Test 867 public void testBlockAdditionWaitWhenCache() throws Exception { 868 BucketCache bucketCache = null; 869 try { 870 final Path dataTestDir = createAndGetTestDir(); 871 872 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 873 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 874 875 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 876 config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000); 877 878 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 879 constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config); 880 assertTrue(bucketCache.waitForCacheInitialization(10000)); 881 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 882 assertEquals(0, usedByteSize); 883 884 HFileBlockPair[] hfileBlockPairs = 885 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 886 // Add blocks 887 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 888 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 889 true); 890 } 891 892 // Max wait for 10 seconds. 893 long timeout = 10000; 894 // Wait for blocks size to match the number of blocks. 895 while (bucketCache.backingMap.size() != 10) { 896 if (timeout <= 0) break; 897 Threads.sleep(100); 898 timeout -= 100; 899 } 900 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 901 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 902 } 903 usedByteSize = bucketCache.getAllocator().getUsedSize(); 904 assertNotEquals(0, usedByteSize); 905 // persist cache to file 906 bucketCache.shutdown(); 907 assertTrue(new File(persistencePath).exists()); 908 909 // restore cache from file 910 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 911 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 912 assertTrue(bucketCache.waitForCacheInitialization(10000)); 913 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 914 915 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 916 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 917 bucketCache.evictBlock(blockCacheKey); 918 } 919 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 920 assertEquals(0, bucketCache.backingMap.size()); 921 } finally { 922 if (bucketCache != null) { 923 bucketCache.shutdown(); 924 } 925 HBASE_TESTING_UTILITY.cleanupTestDir(); 926 } 927 } 928 929 @Test 930 public void testOnConfigurationChange() throws Exception { 931 BucketCache bucketCache = null; 932 try { 933 final Path dataTestDir = createAndGetTestDir(); 934 935 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 936 937 Configuration config = HBASE_TESTING_UTILITY.getConfiguration(); 938 939 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 940 constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config); 941 942 assertTrue(bucketCache.waitForCacheInitialization(10000)); 943 944 config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 945 config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f); 946 config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f); 947 config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f); 948 config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f); 949 config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f); 950 config.setLong(QUEUE_ADDITION_WAIT_TIME, 100); 951 config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500); 952 config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000); 953 954 bucketCache.onConfigurationChange(config); 955 956 assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01); 957 assertEquals(0.8f, bucketCache.getMinFactor(), 0.01); 958 assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01); 959 assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01); 960 assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01); 961 assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01); 962 assertEquals(100L, bucketCache.getQueueAdditionWaitTime()); 963 assertEquals(500L, bucketCache.getBucketcachePersistInterval()); 964 assertEquals(1000L, bucketCache.getPersistenceChunkSize()); 965 966 } finally { 967 if (bucketCache != null) { 968 bucketCache.shutdown(); 969 } 970 HBASE_TESTING_UTILITY.cleanupTestDir(); 971 } 972 } 973 974 @Test 975 public void testNotifyFileCachingCompletedSuccess() throws Exception { 976 BucketCache bucketCache = null; 977 try { 978 Path filePath = 979 new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess"); 980 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false); 981 if (bucketCache.getStats().getFailedInserts() > 0) { 982 LOG.info("There were {} fail inserts, " 983 + "will assert if total blocks in backingMap equals (10 - failInserts) " 984 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 985 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 986 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 987 } else { 988 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 989 } 990 } finally { 991 if (bucketCache != null) { 992 bucketCache.shutdown(); 993 } 994 HBASE_TESTING_UTILITY.cleanupTestDir(); 995 } 996 } 997 998 @Test 999 public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception { 1000 BucketCache bucketCache = null; 1001 try { 1002 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1003 "testNotifyFileCachingCompletedForEncodedDataSuccess"); 1004 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true); 1005 if (bucketCache.getStats().getFailedInserts() > 0) { 1006 LOG.info("There were {} fail inserts, " 1007 + "will assert if total blocks in backingMap equals (10 - failInserts) " 1008 + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); 1009 assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); 1010 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1011 } else { 1012 assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1013 } 1014 } finally { 1015 if (bucketCache != null) { 1016 bucketCache.shutdown(); 1017 } 1018 HBASE_TESTING_UTILITY.cleanupTestDir(); 1019 } 1020 } 1021 1022 @Test 1023 public void testNotifyFileCachingCompletedNotAllCached() throws Exception { 1024 BucketCache bucketCache = null; 1025 try { 1026 Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), 1027 "testNotifyFileCachingCompletedNotAllCached"); 1028 // Deliberately passing more blocks than we have created to test that 1029 // notifyFileCachingCompleted will not consider the file fully cached 1030 bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false); 1031 assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); 1032 } finally { 1033 if (bucketCache != null) { 1034 bucketCache.shutdown(); 1035 } 1036 HBASE_TESTING_UTILITY.cleanupTestDir(); 1037 } 1038 } 1039 1040 private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, 1041 int totalBlocksToCheck, boolean encoded) throws Exception { 1042 final Path dataTestDir = createAndGetTestDir(); 1043 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 1044 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 1045 constructedBlockSizes, 1, 1, null); 1046 assertTrue(bucketCache.waitForCacheInitialization(10000)); 1047 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 1048 assertEquals(0, usedByteSize); 1049 HFileBlockPair[] hfileBlockPairs = 1050 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded); 1051 // Add blocks 1052 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 1053 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true); 1054 } 1055 bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck, 1056 totalBlocksToCheck * constructedBlockSize); 1057 return bucketCache; 1058 } 1059 1060 @Test 1061 public void testEvictOrphansOutOfGracePeriod() throws Exception { 1062 BucketCache bucketCache = testEvictOrphans(0); 1063 assertEquals(10, bucketCache.getBackingMap().size()); 1064 assertEquals(0, bucketCache.blocksByHFile.stream() 1065 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count()); 1066 } 1067 1068 @Test 1069 public void testEvictOrphansWithinGracePeriod() throws Exception { 1070 BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L); 1071 assertEquals(18, bucketCache.getBackingMap().size()); 1072 assertTrue(bucketCache.blocksByHFile.stream() 1073 .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0); 1074 } 1075 1076 private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception { 1077 Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid"); 1078 Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan"); 1079 Map<String, HRegion> onlineRegions = new HashMap<>(); 1080 List<HStore> stores = new ArrayList<>(); 1081 Collection<HStoreFile> storeFiles = new ArrayList<>(); 1082 HRegion mockedRegion = mock(HRegion.class); 1083 HStore mockedStore = mock(HStore.class); 1084 HStoreFile mockedStoreFile = mock(HStoreFile.class); 1085 when(mockedStoreFile.getPath()).thenReturn(validFile); 1086 storeFiles.add(mockedStoreFile); 1087 when(mockedStore.getStorefiles()).thenReturn(storeFiles); 1088 stores.add(mockedStore); 1089 when(mockedRegion.getStores()).thenReturn(stores); 1090 onlineRegions.put("mocked_region", mockedRegion); 1091 HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 1092 HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 1093 HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 1094 HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD, 1095 orphanEvictionGracePeriod); 1096 BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21, 1097 constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000, 1098 HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions); 1099 HFileBlockPair[] validBlockPairs = 1100 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile); 1101 HFileBlockPair[] orphanBlockPairs = 1102 CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile); 1103 for (HFileBlockPair pair : validBlockPairs) { 1104 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1105 } 1106 waitUntilAllFlushedToBucket(bucketCache); 1107 assertEquals(10, bucketCache.getBackingMap().size()); 1108 bucketCache.freeSpace("test"); 1109 assertEquals(10, bucketCache.getBackingMap().size()); 1110 for (HFileBlockPair pair : orphanBlockPairs) { 1111 bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true); 1112 } 1113 waitUntilAllFlushedToBucket(bucketCache); 1114 assertEquals(20, bucketCache.getBackingMap().size()); 1115 bucketCache.freeSpace("test"); 1116 return bucketCache; 1117 } 1118}