001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.when; 027 028import java.io.File; 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.io.ByteBuffAllocator; 045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 046import org.apache.hadoop.hbase.io.hfile.BlockType; 047import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 048import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 049import org.apache.hadoop.hbase.io.hfile.Cacheable; 050import org.apache.hadoop.hbase.io.hfile.HFileBlock; 051import org.apache.hadoop.hbase.io.hfile.HFileContext; 052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 053import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; 054import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; 055import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; 056import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 057import org.apache.hadoop.hbase.nio.ByteBuff; 058import org.apache.hadoop.hbase.testclassification.IOTests; 059import org.apache.hadoop.hbase.testclassification.LargeTests; 060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.hadoop.hbase.util.Threads; 063import org.junit.After; 064import org.junit.Assert; 065import org.junit.Before; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.mockito.Mockito; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 074 075/** 076 * Basic test of BucketCache.Puts and gets. 077 * <p> 078 * Tests will ensure that blocks' data correctness under several threads concurrency 079 */ 080@RunWith(Parameterized.class) 081@Category({ IOTests.class, LargeTests.class }) 082public class TestBucketCache { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestBucketCache.class); 087 088 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 089 public static Iterable<Object[]> data() { 090 return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize 091 // for these tests? 092 { 16 * 1024, 093 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 094 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 095 128 * 1024 + 1024 } } }); 096 } 097 098 @Parameterized.Parameter(0) 099 public int constructedBlockSize; 100 101 @Parameterized.Parameter(1) 102 public int[] constructedBlockSizes; 103 104 BucketCache cache; 105 final int CACHE_SIZE = 1000000; 106 final int NUM_BLOCKS = 100; 107 final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; 108 final int NUM_THREADS = 100; 109 final int NUM_QUERIES = 10000; 110 111 final long capacitySize = 32 * 1024 * 1024; 112 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 113 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 114 private String ioEngineName = "offheap"; 115 116 private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); 117 118 private static class MockedBucketCache extends BucketCache { 119 120 public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 121 int writerThreads, int writerQLen, String persistencePath) throws IOException { 122 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, 123 persistencePath); 124 } 125 126 @Override 127 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 128 super.cacheBlock(cacheKey, buf, inMemory); 129 } 130 131 @Override 132 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 133 super.cacheBlock(cacheKey, buf); 134 } 135 } 136 137 @Before 138 public void setup() throws IOException { 139 cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, 140 constructedBlockSizes, writeThreads, writerQLen, null); 141 } 142 143 @After 144 public void tearDown() { 145 cache.shutdown(); 146 } 147 148 /** 149 * Test Utility to create test dir and return name 150 * @return return name of created dir 151 * @throws IOException throws IOException 152 */ 153 private Path createAndGetTestDir() throws IOException { 154 final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir(); 155 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir); 156 return testDir; 157 } 158 159 /** 160 * Return a random element from {@code a}. 161 */ 162 private static <T> T randFrom(List<T> a) { 163 return a.get(ThreadLocalRandom.current().nextInt(a.size())); 164 } 165 166 @Test 167 public void testBucketAllocator() throws BucketAllocatorException { 168 BucketAllocator mAllocator = cache.getAllocator(); 169 /* 170 * Test the allocator first 171 */ 172 final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024); 173 174 boolean full = false; 175 ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>(); 176 // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until 177 // the cache is completely filled. 178 List<Integer> tmp = new ArrayList<>(BLOCKSIZES); 179 while (!full) { 180 Integer blockSize = null; 181 try { 182 blockSize = randFrom(tmp); 183 allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize)); 184 } catch (CacheFullException cfe) { 185 tmp.remove(blockSize); 186 if (tmp.isEmpty()) full = true; 187 } 188 } 189 190 for (Integer blockSize : BLOCKSIZES) { 191 BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize); 192 IndexStatistics indexStatistics = bucketSizeInfo.statistics(); 193 assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount()); 194 195 // we know the block sizes above are multiples of 1024, but default bucket sizes give an 196 // additional 1024 on top of that so this counts towards fragmentation in our test 197 // real life may have worse fragmentation because blocks may not be perfectly sized to block 198 // size, given encoding/compression and large rows 199 assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes()); 200 } 201 202 mAllocator.logDebugStatistics(); 203 204 for (Pair<Long, Integer> allocation : allocations) { 205 assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()), 206 mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond())); 207 } 208 assertEquals(0, mAllocator.getUsedSize()); 209 } 210 211 @Test 212 public void testCacheSimple() throws Exception { 213 CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES); 214 } 215 216 @Test 217 public void testCacheMultiThreadedSingleKey() throws Exception { 218 CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES); 219 } 220 221 @Test 222 public void testHeapSizeChanges() throws Exception { 223 cache.stopWriterThreads(); 224 CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); 225 } 226 227 public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 228 throws InterruptedException { 229 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 230 Thread.sleep(100); 231 } 232 Thread.sleep(1000); 233 } 234 235 public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { 236 while (!cache.ramCache.isEmpty()) { 237 Thread.sleep(100); 238 } 239 Thread.sleep(1000); 240 } 241 242 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 243 // threads will flush it to the bucket and put reference entry in backingMap. 244 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 245 Cacheable block, boolean waitWhenCache) throws InterruptedException { 246 cache.cacheBlock(cacheKey, block, false, waitWhenCache); 247 waitUntilFlushedToBucket(cache, cacheKey); 248 } 249 250 @Test 251 public void testMemoryLeak() throws Exception { 252 final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); 253 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 254 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 255 long lockId = cache.backingMap.get(cacheKey).offset(); 256 ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); 257 lock.writeLock().lock(); 258 Thread evictThread = new Thread("evict-block") { 259 @Override 260 public void run() { 261 cache.evictBlock(cacheKey); 262 } 263 }; 264 evictThread.start(); 265 cache.offsetLock.waitForWaiters(lockId, 1); 266 cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); 267 assertEquals(0, cache.getBlockCount()); 268 cacheAndWaitUntilFlushedToBucket(cache, cacheKey, 269 new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); 270 assertEquals(1, cache.getBlockCount()); 271 lock.writeLock().unlock(); 272 evictThread.join(); 273 /** 274 * <pre> 275 * The asserts here before HBASE-21957 are: 276 * assertEquals(1L, cache.getBlockCount()); 277 * assertTrue(cache.getCurrentSize() > 0L); 278 * assertTrue("We should have a block!", cache.iterator().hasNext()); 279 * 280 * The asserts here after HBASE-21957 are: 281 * assertEquals(0, cache.getBlockCount()); 282 * assertEquals(cache.getCurrentSize(), 0L); 283 * 284 * I think the asserts before HBASE-21957 is more reasonable,because 285 * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry} 286 * it had seen, and newly added Block after the {@link BucketEntry} 287 * it had seen should not be evicted. 288 * </pre> 289 */ 290 assertEquals(1L, cache.getBlockCount()); 291 assertTrue(cache.getCurrentSize() > 0L); 292 assertTrue("We should have a block!", cache.iterator().hasNext()); 293 } 294 295 @Test 296 public void testRetrieveFromFile() throws Exception { 297 Path testDir = createAndGetTestDir(); 298 String ioEngineName = "file:" + testDir + "/bucket.cache"; 299 testRetrievalUtils(testDir, ioEngineName); 300 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 301 String persistencePath = testDir + "/bucket.persistence"; 302 BucketCache bucketCache = null; 303 try { 304 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 305 smallBucketSizes, writeThreads, writerQLen, persistencePath); 306 assertFalse(new File(persistencePath).exists()); 307 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 308 assertEquals(0, bucketCache.backingMap.size()); 309 } finally { 310 bucketCache.shutdown(); 311 HBASE_TESTING_UTILITY.cleanupTestDir(); 312 } 313 } 314 315 @Test 316 public void testRetrieveFromMMap() throws Exception { 317 final Path testDir = createAndGetTestDir(); 318 final String ioEngineName = "mmap:" + testDir + "/bucket.cache"; 319 testRetrievalUtils(testDir, ioEngineName); 320 } 321 322 @Test 323 public void testRetrieveFromPMem() throws Exception { 324 final Path testDir = createAndGetTestDir(); 325 final String ioEngineName = "pmem:" + testDir + "/bucket.cache"; 326 testRetrievalUtils(testDir, ioEngineName); 327 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 328 String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 329 BucketCache bucketCache = null; 330 try { 331 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 332 smallBucketSizes, writeThreads, writerQLen, persistencePath); 333 assertFalse(new File(persistencePath).exists()); 334 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 335 assertEquals(0, bucketCache.backingMap.size()); 336 } finally { 337 bucketCache.shutdown(); 338 HBASE_TESTING_UTILITY.cleanupTestDir(); 339 } 340 } 341 342 private void testRetrievalUtils(Path testDir, String ioEngineName) 343 throws IOException, InterruptedException { 344 final String persistencePath = 345 testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 346 BucketCache bucketCache = null; 347 try { 348 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 349 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 350 long usedSize = bucketCache.getAllocator().getUsedSize(); 351 assertEquals(0, usedSize); 352 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 353 for (HFileBlockPair block : blocks) { 354 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 355 } 356 for (HFileBlockPair block : blocks) { 357 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 358 false); 359 } 360 usedSize = bucketCache.getAllocator().getUsedSize(); 361 assertNotEquals(0, usedSize); 362 bucketCache.shutdown(); 363 assertTrue(new File(persistencePath).exists()); 364 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 365 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 366 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 367 } finally { 368 if (bucketCache != null) { 369 bucketCache.shutdown(); 370 } 371 } 372 assertTrue(new File(persistencePath).exists()); 373 } 374 375 @Test 376 public void testRetrieveUnsupportedIOE() throws Exception { 377 try { 378 final Path testDir = createAndGetTestDir(); 379 final String ioEngineName = testDir + "/bucket.cache"; 380 testRetrievalUtils(testDir, ioEngineName); 381 Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!"); 382 } catch (IllegalArgumentException e) { 383 Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " 384 + "files:, mmap: or offheap", e.getMessage()); 385 } 386 } 387 388 @Test 389 public void testRetrieveFromMultipleFiles() throws Exception { 390 final Path testDirInitial = createAndGetTestDir(); 391 final Path newTestDir = new HBaseTestingUtility().getDataTestDir(); 392 HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir); 393 String ioEngineName = 394 new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache") 395 .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString(); 396 testRetrievalUtils(testDirInitial, ioEngineName); 397 int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 }; 398 String persistencePath = testDirInitial + "/bucket.persistence"; 399 BucketCache bucketCache = null; 400 try { 401 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 402 smallBucketSizes, writeThreads, writerQLen, persistencePath); 403 assertFalse(new File(persistencePath).exists()); 404 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 405 assertEquals(0, bucketCache.backingMap.size()); 406 } finally { 407 bucketCache.shutdown(); 408 HBASE_TESTING_UTILITY.cleanupTestDir(); 409 } 410 } 411 412 @Test 413 public void testRetrieveFromFileWithoutPersistence() throws Exception { 414 BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 415 constructedBlockSizes, writeThreads, writerQLen, null); 416 try { 417 final Path testDir = createAndGetTestDir(); 418 String ioEngineName = "file:" + testDir + "/bucket.cache"; 419 long usedSize = bucketCache.getAllocator().getUsedSize(); 420 assertEquals(0, usedSize); 421 HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 422 for (HFileBlockPair block : blocks) { 423 bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); 424 } 425 for (HFileBlockPair block : blocks) { 426 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), 427 false); 428 } 429 usedSize = bucketCache.getAllocator().getUsedSize(); 430 assertNotEquals(0, usedSize); 431 bucketCache.shutdown(); 432 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 433 constructedBlockSizes, writeThreads, writerQLen, null); 434 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 435 } finally { 436 bucketCache.shutdown(); 437 HBASE_TESTING_UTILITY.cleanupTestDir(); 438 } 439 } 440 441 @Test 442 public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { 443 long availableSpace = 20 * 1024L * 1024 * 1024; 444 int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; 445 BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); 446 assertTrue(allocator.getBuckets().length > 0); 447 } 448 449 @Test 450 public void testGetPartitionSize() throws IOException { 451 // Test default values 452 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 453 BucketCache.DEFAULT_MIN_FACTOR); 454 455 Configuration conf = HBaseConfiguration.create(); 456 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 457 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 458 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 459 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 460 461 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 462 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 463 464 validateGetPartitionSize(cache, 0.1f, 0.5f); 465 validateGetPartitionSize(cache, 0.7f, 0.5f); 466 validateGetPartitionSize(cache, 0.2f, 0.5f); 467 } 468 469 @Test 470 public void testCacheSizeCapacity() throws IOException { 471 // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE 472 validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, 473 BucketCache.DEFAULT_MIN_FACTOR); 474 Configuration conf = HBaseConfiguration.create(); 475 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 476 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 477 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 478 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 479 try { 480 new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads, 481 writerQLen, null, 100, conf); 482 Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!"); 483 } catch (IllegalArgumentException e) { 484 Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage()); 485 } 486 } 487 488 @Test 489 public void testValidBucketCacheConfigs() throws IOException { 490 Configuration conf = HBaseConfiguration.create(); 491 conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f); 492 conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); 493 conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); 494 conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); 495 conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); 496 conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); 497 498 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 499 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 500 501 assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, 502 cache.getAcceptableFactor(), 0); 503 assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 504 cache.getMinFactor(), 0); 505 assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, 506 cache.getExtraFreeFactor(), 0); 507 assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, 508 cache.getSingleFactor(), 0); 509 assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, 510 cache.getMultiFactor(), 0); 511 assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, 512 cache.getMemoryFactor(), 0); 513 } 514 515 @Test 516 public void testInvalidAcceptFactorConfig() throws IOException { 517 float[] configValues = { -1f, 0.2f, 0.86f, 1.05f }; 518 boolean[] expectedOutcomes = { false, false, true, false }; 519 Map<String, float[]> configMappings = 520 ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues); 521 Configuration conf = HBaseConfiguration.create(); 522 checkConfigValues(conf, configMappings, expectedOutcomes); 523 } 524 525 @Test 526 public void testInvalidMinFactorConfig() throws IOException { 527 float[] configValues = { -1f, 0f, 0.96f, 1.05f }; 528 // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 529 boolean[] expectedOutcomes = { false, true, false, false }; 530 Map<String, float[]> configMappings = 531 ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues); 532 Configuration conf = HBaseConfiguration.create(); 533 checkConfigValues(conf, configMappings, expectedOutcomes); 534 } 535 536 @Test 537 public void testInvalidExtraFreeFactorConfig() throws IOException { 538 float[] configValues = { -1f, 0f, 0.2f, 1.05f }; 539 // throws due to <0, in expected range, in expected range, config can be > 1.0 540 boolean[] expectedOutcomes = { false, true, true, true }; 541 Map<String, float[]> configMappings = 542 ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); 543 Configuration conf = HBaseConfiguration.create(); 544 checkConfigValues(conf, configMappings, expectedOutcomes); 545 } 546 547 @Test 548 public void testInvalidCacheSplitFactorConfig() throws IOException { 549 float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f }; 550 float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f }; 551 float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f }; 552 // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't 553 // be negative, configs don't add to 1.0 554 boolean[] expectedOutcomes = { true, false, false, false }; 555 Map<String, 556 float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 557 singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, 558 BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues); 559 Configuration conf = HBaseConfiguration.create(); 560 checkConfigValues(conf, configMappings, expectedOutcomes); 561 } 562 563 private void checkConfigValues(Configuration conf, Map<String, float[]> configMap, 564 boolean[] expectSuccess) throws IOException { 565 Set<String> configNames = configMap.keySet(); 566 for (int i = 0; i < expectSuccess.length; i++) { 567 try { 568 for (String configName : configNames) { 569 conf.setFloat(configName, configMap.get(configName)[i]); 570 } 571 BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 572 constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); 573 assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] 574 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 575 } catch (IllegalArgumentException e) { 576 assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] 577 + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); 578 } 579 } 580 } 581 582 private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, 583 float minFactor) { 584 long expectedOutput = 585 (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); 586 assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); 587 } 588 589 @Test 590 public void testOffsetProducesPositiveOutput() { 591 // This number is picked because it produces negative output if the values isn't ensured to be 592 // positive. See HBASE-18757 for more information. 593 long testValue = 549888460800L; 594 BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> { 595 return ByteBuffAllocator.NONE; 596 }, ByteBuffAllocator.HEAP); 597 assertEquals(testValue, bucketEntry.offset()); 598 } 599 600 @Test 601 public void testEvictionCount() throws InterruptedException { 602 int size = 100; 603 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 604 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 605 HFileContext meta = new HFileContextBuilder().build(); 606 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 607 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 608 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 609 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 610 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 611 612 BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0); 613 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 614 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 615 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 616 blockWithNextBlockMetadata.serialize(block1Buffer, true); 617 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 618 619 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 620 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 621 block1Buffer); 622 623 waitUntilFlushedToBucket(cache, key); 624 625 assertEquals(0, cache.getStats().getEvictionCount()); 626 627 // evict call should return 1, but then eviction count be 0 628 assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount")); 629 assertEquals(0, cache.getStats().getEvictionCount()); 630 631 // add back 632 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 633 block1Buffer); 634 waitUntilFlushedToBucket(cache, key); 635 636 // should not increment 637 assertTrue(cache.evictBlock(key)); 638 assertEquals(0, cache.getStats().getEvictionCount()); 639 640 // add back 641 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 642 block1Buffer); 643 waitUntilFlushedToBucket(cache, key); 644 645 // should finally increment eviction count 646 cache.freeSpace("testing"); 647 assertEquals(1, cache.getStats().getEvictionCount()); 648 } 649 650 @Test 651 public void testCacheBlockNextBlockMetadataMissing() throws Exception { 652 int size = 100; 653 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 654 ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size); 655 HFileContext meta = new HFileContextBuilder().build(); 656 ByteBuffAllocator allocator = ByteBuffAllocator.HEAP; 657 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 658 ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator); 659 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 660 ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator); 661 662 BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0); 663 ByteBuffer actualBuffer = ByteBuffer.allocate(length); 664 ByteBuffer block1Buffer = ByteBuffer.allocate(length); 665 ByteBuffer block2Buffer = ByteBuffer.allocate(length); 666 blockWithNextBlockMetadata.serialize(block1Buffer, true); 667 blockWithoutNextBlockMetadata.serialize(block2Buffer, true); 668 669 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. 670 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 671 block1Buffer); 672 673 waitUntilFlushedToBucket(cache, key); 674 assertNotNull(cache.backingMap.get(key)); 675 assertEquals(1, cache.backingMap.get(key).refCnt()); 676 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 677 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 678 679 // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. 680 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 681 block1Buffer); 682 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 683 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 684 assertEquals(1, cache.backingMap.get(key).refCnt()); 685 686 // Clear and add blockWithoutNextBlockMetadata 687 assertTrue(cache.evictBlock(key)); 688 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 689 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 690 691 assertNull(cache.getBlock(key, false, false, false)); 692 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, 693 block2Buffer); 694 695 waitUntilFlushedToBucket(cache, key); 696 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 697 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 698 699 // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. 700 CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, 701 block1Buffer); 702 703 waitUntilFlushedToBucket(cache, key); 704 assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt()); 705 assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt()); 706 } 707 708 @Test 709 public void testRAMCache() { 710 int size = 100; 711 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 712 byte[] byteArr = new byte[length]; 713 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 714 HFileContext meta = new HFileContextBuilder().build(); 715 716 RAMCache cache = new RAMCache(); 717 BlockCacheKey key1 = new BlockCacheKey("file-1", 1); 718 BlockCacheKey key2 = new BlockCacheKey("file-2", 2); 719 HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 720 HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); 721 HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 722 HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); 723 RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false); 724 RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false); 725 726 assertFalse(cache.containsKey(key1)); 727 assertNull(cache.putIfAbsent(key1, re1)); 728 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 729 730 assertNotNull(cache.putIfAbsent(key1, re2)); 731 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 732 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 733 734 assertNull(cache.putIfAbsent(key2, re2)); 735 assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 736 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 737 738 cache.remove(key1); 739 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 740 assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 741 742 cache.clear(); 743 assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt()); 744 assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt()); 745 } 746 747 @Test 748 public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { 749 // initialize an block. 750 int size = 100, offset = 20; 751 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 752 ByteBuffer buf = ByteBuffer.allocate(length); 753 HFileContext meta = new HFileContextBuilder().build(); 754 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 755 HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP); 756 757 // initialize an mocked ioengine. 758 IOEngine ioEngine = Mockito.mock(IOEngine.class); 759 when(ioEngine.usesSharedMemory()).thenReturn(false); 760 // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong()); 761 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class), 762 Mockito.anyLong()); 763 Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class), 764 Mockito.anyLong()); 765 766 // create an bucket allocator. 767 long availableSpace = 1024 * 1024 * 1024L; 768 BucketAllocator allocator = new BucketAllocator(availableSpace, null); 769 770 BlockCacheKey key = new BlockCacheKey("dummy", 1L); 771 RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false); 772 773 Assert.assertEquals(0, allocator.getUsedSize()); 774 try { 775 re.writeToCache(ioEngine, allocator, null, null, 776 ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 777 Assert.fail(); 778 } catch (Exception e) { 779 } 780 Assert.assertEquals(0, allocator.getUsedSize()); 781 } 782 783 /** 784 * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file 785 * could not be freed even if corresponding {@link HFileBlock} is evicted from 786 * {@link BucketCache}. 787 */ 788 @Test 789 public void testFreeBucketEntryRestoredFromFile() throws Exception { 790 BucketCache bucketCache = null; 791 try { 792 final Path dataTestDir = createAndGetTestDir(); 793 794 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 795 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 796 797 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 798 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 799 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 800 assertEquals(0, usedByteSize); 801 802 HFileBlockPair[] hfileBlockPairs = 803 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 804 // Add blocks 805 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 806 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); 807 } 808 809 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 810 cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), 811 hfileBlockPair.getBlock(), false); 812 } 813 usedByteSize = bucketCache.getAllocator().getUsedSize(); 814 assertNotEquals(0, usedByteSize); 815 // persist cache to file 816 bucketCache.shutdown(); 817 assertTrue(new File(persistencePath).exists()); 818 819 // restore cache from file 820 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 821 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 822 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 823 824 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 825 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 826 bucketCache.evictBlock(blockCacheKey); 827 } 828 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 829 assertEquals(0, bucketCache.backingMap.size()); 830 } finally { 831 bucketCache.shutdown(); 832 HBASE_TESTING_UTILITY.cleanupTestDir(); 833 } 834 } 835 836 @Test 837 public void testBlockAdditionWaitWhenCache() throws Exception { 838 BucketCache bucketCache = null; 839 try { 840 final Path dataTestDir = createAndGetTestDir(); 841 842 String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; 843 String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; 844 845 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 846 constructedBlockSizes, 1, 1, persistencePath); 847 long usedByteSize = bucketCache.getAllocator().getUsedSize(); 848 assertEquals(0, usedByteSize); 849 850 HFileBlockPair[] hfileBlockPairs = 851 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); 852 // Add blocks 853 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 854 bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, 855 true); 856 } 857 858 // Max wait for 10 seconds. 859 long timeout = 10000; 860 // Wait for blocks size to match the number of blocks. 861 while (bucketCache.backingMap.size() != 10) { 862 if (timeout <= 0) break; 863 Threads.sleep(100); 864 timeout -= 100; 865 } 866 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 867 assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); 868 } 869 usedByteSize = bucketCache.getAllocator().getUsedSize(); 870 assertNotEquals(0, usedByteSize); 871 // persist cache to file 872 bucketCache.shutdown(); 873 assertTrue(new File(persistencePath).exists()); 874 875 // restore cache from file 876 bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, 877 constructedBlockSizes, writeThreads, writerQLen, persistencePath); 878 assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); 879 880 for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { 881 BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); 882 bucketCache.evictBlock(blockCacheKey); 883 } 884 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 885 assertEquals(0, bucketCache.backingMap.size()); 886 } finally { 887 if (bucketCache != null) { 888 bucketCache.shutdown(); 889 } 890 HBASE_TESTING_UTILITY.cleanupTestDir(); 891 } 892 } 893}