001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
025import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_MIN_FACTOR;
026import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_SINGLE_FACTOR;
027import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
028import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MEMORY_FACTOR_CONFIG_NAME;
029import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
030import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MULTI_FACTOR_CONFIG_NAME;
031import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
032import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.SINGLE_FACTOR_CONFIG_NAME;
033import static org.junit.Assert.assertEquals;
034import static org.junit.Assert.assertFalse;
035import static org.junit.Assert.assertNotEquals;
036import static org.junit.Assert.assertNotNull;
037import static org.junit.Assert.assertNull;
038import static org.junit.Assert.assertTrue;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.when;
041
042import java.io.File;
043import java.io.IOException;
044import java.nio.ByteBuffer;
045import java.util.ArrayList;
046import java.util.Arrays;
047import java.util.Collection;
048import java.util.HashMap;
049import java.util.List;
050import java.util.Map;
051import java.util.Set;
052import java.util.concurrent.ThreadLocalRandom;
053import java.util.concurrent.locks.ReentrantReadWriteLock;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.fs.Path;
056import org.apache.hadoop.hbase.HBaseClassTestRule;
057import org.apache.hadoop.hbase.HBaseConfiguration;
058import org.apache.hadoop.hbase.HBaseTestingUtil;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.Waiter;
061import org.apache.hadoop.hbase.io.ByteBuffAllocator;
062import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
063import org.apache.hadoop.hbase.io.hfile.BlockType;
064import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
065import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
066import org.apache.hadoop.hbase.io.hfile.Cacheable;
067import org.apache.hadoop.hbase.io.hfile.HFileBlock;
068import org.apache.hadoop.hbase.io.hfile.HFileContext;
069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
070import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
071import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
072import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
073import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
074import org.apache.hadoop.hbase.nio.ByteBuff;
075import org.apache.hadoop.hbase.regionserver.HRegion;
076import org.apache.hadoop.hbase.regionserver.HStore;
077import org.apache.hadoop.hbase.regionserver.HStoreFile;
078import org.apache.hadoop.hbase.testclassification.IOTests;
079import org.apache.hadoop.hbase.testclassification.LargeTests;
080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.util.Threads;
083import org.junit.After;
084import org.junit.Assert;
085import org.junit.Before;
086import org.junit.ClassRule;
087import org.junit.Test;
088import org.junit.experimental.categories.Category;
089import org.junit.runner.RunWith;
090import org.junit.runners.Parameterized;
091import org.mockito.Mockito;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
096
097/**
098 * Basic test of BucketCache.Puts and gets.
099 * <p>
100 * Tests will ensure that blocks' data correctness under several threads concurrency
101 */
102@RunWith(Parameterized.class)
103@Category({ IOTests.class, LargeTests.class })
104public class TestBucketCache {
105
106  private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class);
107
108  @ClassRule
109  public static final HBaseClassTestRule CLASS_RULE =
110    HBaseClassTestRule.forClass(TestBucketCache.class);
111
112  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
113  public static Iterable<Object[]> data() {
114    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
115                                                          // for these tests?
116      { 16 * 1024,
117        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
118          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
119          128 * 1024 + 1024 } } });
120  }
121
122  @Parameterized.Parameter(0)
123  public int constructedBlockSize;
124
125  @Parameterized.Parameter(1)
126  public int[] constructedBlockSizes;
127
128  BucketCache cache;
129  final int CACHE_SIZE = 1000000;
130  final int NUM_BLOCKS = 100;
131  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
132  final int NUM_THREADS = 100;
133  final int NUM_QUERIES = 10000;
134
135  final long capacitySize = 32 * 1024 * 1024;
136  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
137  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
138  private String ioEngineName = "offheap";
139
140  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
141
142  private static class MockedBucketCache extends BucketCache {
143
144    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
145      int writerThreads, int writerQLen, String persistencePath) throws IOException {
146      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
147        persistencePath);
148    }
149
150    @Override
151    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
152      super.cacheBlock(cacheKey, buf, inMemory);
153    }
154
155    @Override
156    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
157      super.cacheBlock(cacheKey, buf);
158    }
159  }
160
161  @Before
162  public void setup() throws IOException {
163    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
164      constructedBlockSizes, writeThreads, writerQLen, null);
165  }
166
167  @After
168  public void tearDown() {
169    cache.shutdown();
170  }
171
172  /**
173   * Test Utility to create test dir and return name
174   * @return return name of created dir
175   * @throws IOException throws IOException
176   */
177  private Path createAndGetTestDir() throws IOException {
178    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
179    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
180    return testDir;
181  }
182
183  /**
184   * Return a random element from {@code a}.
185   */
186  private static <T> T randFrom(List<T> a) {
187    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
188  }
189
190  @Test
191  public void testBucketAllocator() throws BucketAllocatorException {
192    BucketAllocator mAllocator = cache.getAllocator();
193    /*
194     * Test the allocator first
195     */
196    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
197
198    boolean full = false;
199    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
200    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
201    // the cache is completely filled.
202    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
203    while (!full) {
204      Integer blockSize = null;
205      try {
206        blockSize = randFrom(tmp);
207        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
208      } catch (CacheFullException cfe) {
209        tmp.remove(blockSize);
210        if (tmp.isEmpty()) full = true;
211      }
212    }
213
214    for (Integer blockSize : BLOCKSIZES) {
215      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
216      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
217      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
218
219      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
220      // additional 1024 on top of that so this counts towards fragmentation in our test
221      // real life may have worse fragmentation because blocks may not be perfectly sized to block
222      // size, given encoding/compression and large rows
223      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
224    }
225
226    mAllocator.logDebugStatistics();
227
228    for (Pair<Long, Integer> allocation : allocations) {
229      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
230        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
231    }
232    assertEquals(0, mAllocator.getUsedSize());
233  }
234
235  @Test
236  public void testCacheSimple() throws Exception {
237    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
238  }
239
240  @Test
241  public void testCacheMultiThreadedSingleKey() throws Exception {
242    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
243  }
244
245  @Test
246  public void testHeapSizeChanges() throws Exception {
247    cache.stopWriterThreads();
248    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
249  }
250
251  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
252    throws InterruptedException {
253    Waiter.waitFor(HBaseConfiguration.create(), 10000,
254      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
255  }
256
257  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
258    while (!cache.ramCache.isEmpty()) {
259      Thread.sleep(100);
260    }
261    Thread.sleep(1000);
262  }
263
264  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
265  // threads will flush it to the bucket and put reference entry in backingMap.
266  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
267    Cacheable block, boolean waitWhenCache) throws InterruptedException {
268    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
269    waitUntilFlushedToBucket(cache, cacheKey);
270  }
271
272  @Test
273  public void testMemoryLeak() throws Exception {
274    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
275    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
276      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
277    long lockId = cache.backingMap.get(cacheKey).offset();
278    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
279    lock.writeLock().lock();
280    Thread evictThread = new Thread("evict-block") {
281      @Override
282      public void run() {
283        cache.evictBlock(cacheKey);
284      }
285    };
286    evictThread.start();
287    cache.offsetLock.waitForWaiters(lockId, 1);
288    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
289    assertEquals(0, cache.getBlockCount());
290    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
291      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
292    assertEquals(1, cache.getBlockCount());
293    lock.writeLock().unlock();
294    evictThread.join();
295    /**
296     * <pre>
297     * The asserts here before HBASE-21957 are:
298     * assertEquals(1L, cache.getBlockCount());
299     * assertTrue(cache.getCurrentSize() > 0L);
300     * assertTrue("We should have a block!", cache.iterator().hasNext());
301     *
302     * The asserts here after HBASE-21957 are:
303     * assertEquals(0, cache.getBlockCount());
304     * assertEquals(cache.getCurrentSize(), 0L);
305     *
306     * I think the asserts before HBASE-21957 is more reasonable,because
307     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
308     * it had seen, and newly added Block after the {@link BucketEntry}
309     * it had seen should not be evicted.
310     * </pre>
311     */
312    assertEquals(1L, cache.getBlockCount());
313    assertTrue(cache.getCurrentSize() > 0L);
314    assertTrue("We should have a block!", cache.iterator().hasNext());
315  }
316
317  @Test
318  public void testRetrieveFromFile() throws Exception {
319    Path testDir = createAndGetTestDir();
320    String ioEngineName = "file:" + testDir + "/bucket.cache";
321    testRetrievalUtils(testDir, ioEngineName);
322    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
323    String persistencePath = testDir + "/bucket.persistence";
324    BucketCache bucketCache = null;
325    try {
326      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
327        smallBucketSizes, writeThreads, writerQLen, persistencePath);
328      assertTrue(bucketCache.waitForCacheInitialization(10000));
329      assertFalse(new File(persistencePath).exists());
330      assertEquals(0, bucketCache.getAllocator().getUsedSize());
331      assertEquals(0, bucketCache.backingMap.size());
332    } finally {
333      bucketCache.shutdown();
334      HBASE_TESTING_UTILITY.cleanupTestDir();
335    }
336  }
337
338  @Test
339  public void testRetrieveFromMMap() throws Exception {
340    final Path testDir = createAndGetTestDir();
341    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
342    testRetrievalUtils(testDir, ioEngineName);
343  }
344
345  @Test
346  public void testRetrieveFromPMem() throws Exception {
347    final Path testDir = createAndGetTestDir();
348    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
349    testRetrievalUtils(testDir, ioEngineName);
350    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
351    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
352    BucketCache bucketCache = null;
353    try {
354      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
355        smallBucketSizes, writeThreads, writerQLen, persistencePath);
356      assertTrue(bucketCache.waitForCacheInitialization(10000));
357      assertFalse(new File(persistencePath).exists());
358      assertEquals(0, bucketCache.getAllocator().getUsedSize());
359      assertEquals(0, bucketCache.backingMap.size());
360    } finally {
361      bucketCache.shutdown();
362      HBASE_TESTING_UTILITY.cleanupTestDir();
363    }
364  }
365
366  private void testRetrievalUtils(Path testDir, String ioEngineName)
367    throws IOException, InterruptedException {
368    final String persistencePath =
369      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
370    BucketCache bucketCache = null;
371    try {
372      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
373        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
374      assertTrue(bucketCache.waitForCacheInitialization(10000));
375      long usedSize = bucketCache.getAllocator().getUsedSize();
376      assertEquals(0, usedSize);
377      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
378      for (HFileBlockPair block : blocks) {
379        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
380      }
381      for (HFileBlockPair block : blocks) {
382        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
383          false);
384      }
385      usedSize = bucketCache.getAllocator().getUsedSize();
386      assertNotEquals(0, usedSize);
387      bucketCache.shutdown();
388      assertTrue(new File(persistencePath).exists());
389      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
390        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
391      assertTrue(bucketCache.waitForCacheInitialization(10000));
392
393      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
394    } finally {
395      if (bucketCache != null) {
396        bucketCache.shutdown();
397      }
398    }
399    assertTrue(new File(persistencePath).exists());
400  }
401
402  @Test
403  public void testRetrieveUnsupportedIOE() throws Exception {
404    try {
405      final Path testDir = createAndGetTestDir();
406      final String ioEngineName = testDir + "/bucket.cache";
407      testRetrievalUtils(testDir, ioEngineName);
408      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
409    } catch (IllegalArgumentException e) {
410      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
411        + "files:, mmap: or offheap", e.getMessage());
412    }
413  }
414
415  @Test
416  public void testRetrieveFromMultipleFiles() throws Exception {
417    final Path testDirInitial = createAndGetTestDir();
418    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
419    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
420    String ioEngineName =
421      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
422        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
423    testRetrievalUtils(testDirInitial, ioEngineName);
424    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
425    String persistencePath = testDirInitial + "/bucket.persistence";
426    BucketCache bucketCache = null;
427    try {
428      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
429        smallBucketSizes, writeThreads, writerQLen, persistencePath);
430      assertTrue(bucketCache.waitForCacheInitialization(10000));
431      assertFalse(new File(persistencePath).exists());
432      assertEquals(0, bucketCache.getAllocator().getUsedSize());
433      assertEquals(0, bucketCache.backingMap.size());
434    } finally {
435      bucketCache.shutdown();
436      HBASE_TESTING_UTILITY.cleanupTestDir();
437    }
438  }
439
440  @Test
441  public void testRetrieveFromFileWithoutPersistence() throws Exception {
442    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
443      constructedBlockSizes, writeThreads, writerQLen, null);
444    assertTrue(bucketCache.waitForCacheInitialization(10000));
445    try {
446      final Path testDir = createAndGetTestDir();
447      String ioEngineName = "file:" + testDir + "/bucket.cache";
448      long usedSize = bucketCache.getAllocator().getUsedSize();
449      assertEquals(0, usedSize);
450      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
451      for (HFileBlockPair block : blocks) {
452        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
453      }
454      for (HFileBlockPair block : blocks) {
455        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
456          false);
457      }
458      usedSize = bucketCache.getAllocator().getUsedSize();
459      assertNotEquals(0, usedSize);
460      bucketCache.shutdown();
461      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
462        constructedBlockSizes, writeThreads, writerQLen, null);
463      assertTrue(bucketCache.waitForCacheInitialization(10000));
464      assertEquals(0, bucketCache.getAllocator().getUsedSize());
465    } finally {
466      bucketCache.shutdown();
467      HBASE_TESTING_UTILITY.cleanupTestDir();
468    }
469  }
470
471  @Test
472  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
473    long availableSpace = 20 * 1024L * 1024 * 1024;
474    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
475    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
476    assertTrue(allocator.getBuckets().length > 0);
477  }
478
479  @Test
480  public void testGetPartitionSize() throws IOException {
481    // Test default values
482    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
483
484    Configuration conf = HBaseConfiguration.create();
485    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
486    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
487    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
488    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
489
490    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
491      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
492    assertTrue(cache.waitForCacheInitialization(10000));
493
494    validateGetPartitionSize(cache, 0.1f, 0.5f);
495    validateGetPartitionSize(cache, 0.7f, 0.5f);
496    validateGetPartitionSize(cache, 0.2f, 0.5f);
497  }
498
499  @Test
500  public void testCacheSizeCapacity() throws IOException {
501    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
502    validateGetPartitionSize(cache, DEFAULT_SINGLE_FACTOR, DEFAULT_MIN_FACTOR);
503    Configuration conf = HBaseConfiguration.create();
504    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
505    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
506    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
507    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
508    try {
509      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
510        writerQLen, null, 100, conf);
511      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
512    } catch (IllegalArgumentException e) {
513      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
514    }
515  }
516
517  @Test
518  public void testValidBucketCacheConfigs() throws IOException {
519    Configuration conf = HBaseConfiguration.create();
520    conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
521    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
522    conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
523    conf.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.1f);
524    conf.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.7f);
525    conf.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
526
527    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
528      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
529    assertTrue(cache.waitForCacheInitialization(10000));
530
531    assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
532      cache.getAcceptableFactor(), 0);
533    assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0);
534    assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
535      cache.getExtraFreeFactor(), 0);
536    assertEquals(SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f, cache.getSingleFactor(),
537      0);
538    assertEquals(MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f, cache.getMultiFactor(),
539      0);
540    assertEquals(MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f, cache.getMemoryFactor(),
541      0);
542  }
543
544  @Test
545  public void testInvalidAcceptFactorConfig() throws IOException {
546    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
547    boolean[] expectedOutcomes = { false, false, true, false };
548    Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues);
549    Configuration conf = HBaseConfiguration.create();
550    checkConfigValues(conf, configMappings, expectedOutcomes);
551  }
552
553  @Test
554  public void testInvalidMinFactorConfig() throws IOException {
555    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
556    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
557    boolean[] expectedOutcomes = { false, true, false, false };
558    Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues);
559    Configuration conf = HBaseConfiguration.create();
560    checkConfigValues(conf, configMappings, expectedOutcomes);
561  }
562
563  @Test
564  public void testInvalidExtraFreeFactorConfig() throws IOException {
565    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
566    // throws due to <0, in expected range, in expected range, config can be > 1.0
567    boolean[] expectedOutcomes = { false, true, true, true };
568    Map<String, float[]> configMappings =
569      ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
570    Configuration conf = HBaseConfiguration.create();
571    checkConfigValues(conf, configMappings, expectedOutcomes);
572  }
573
574  @Test
575  public void testInvalidCacheSplitFactorConfig() throws IOException {
576    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
577    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
578    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
579    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
580    // be negative, configs don't add to 1.0
581    boolean[] expectedOutcomes = { true, false, false, false };
582    Map<String,
583      float[]> configMappings = ImmutableMap.of(SINGLE_FACTOR_CONFIG_NAME, singleFactorConfigValues,
584        MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, MEMORY_FACTOR_CONFIG_NAME,
585        memoryFactorConfigValues);
586    Configuration conf = HBaseConfiguration.create();
587    checkConfigValues(conf, configMappings, expectedOutcomes);
588  }
589
590  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
591    boolean[] expectSuccess) throws IOException {
592    Set<String> configNames = configMap.keySet();
593    for (int i = 0; i < expectSuccess.length; i++) {
594      try {
595        for (String configName : configNames) {
596          conf.setFloat(configName, configMap.get(configName)[i]);
597        }
598        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
599          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
600        assertTrue(cache.waitForCacheInitialization(10000));
601        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
602          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
603      } catch (IllegalArgumentException e) {
604        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
605          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
606      }
607    }
608  }
609
610  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
611    float minFactor) {
612    long expectedOutput =
613      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
614    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
615  }
616
617  @Test
618  public void testOffsetProducesPositiveOutput() {
619    // This number is picked because it produces negative output if the values isn't ensured to be
620    // positive. See HBASE-18757 for more information.
621    long testValue = 549888460800L;
622    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
623      return ByteBuffAllocator.NONE;
624    }, ByteBuffAllocator.HEAP);
625    assertEquals(testValue, bucketEntry.offset());
626  }
627
628  @Test
629  public void testEvictionCount() throws InterruptedException {
630    int size = 100;
631    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
632    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
633    HFileContext meta = new HFileContextBuilder().build();
634    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
635    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
636      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
637    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
638      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
639
640    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
641    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
642    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
643    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
644    blockWithNextBlockMetadata.serialize(block1Buffer, true);
645    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
646
647    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
648    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
649      block1Buffer);
650
651    waitUntilFlushedToBucket(cache, key);
652
653    assertEquals(0, cache.getStats().getEvictionCount());
654
655    // evict call should return 1, but then eviction count be 0
656    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
657    assertEquals(0, cache.getStats().getEvictionCount());
658
659    // add back
660    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
661      block1Buffer);
662    waitUntilFlushedToBucket(cache, key);
663
664    // should not increment
665    assertTrue(cache.evictBlock(key));
666    assertEquals(0, cache.getStats().getEvictionCount());
667
668    // add back
669    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
670      block1Buffer);
671    waitUntilFlushedToBucket(cache, key);
672
673    // should finally increment eviction count
674    cache.freeSpace("testing");
675    assertEquals(1, cache.getStats().getEvictionCount());
676  }
677
678  @Test
679  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
680    int size = 100;
681    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
682    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
683    HFileContext meta = new HFileContextBuilder().build();
684    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
685    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
686      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
687    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
688      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
689
690    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
691    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
692    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
693    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
694    blockWithNextBlockMetadata.serialize(block1Buffer, true);
695    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
696
697    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
698    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
699      block1Buffer);
700
701    waitUntilFlushedToBucket(cache, key);
702    assertNotNull(cache.backingMap.get(key));
703    assertEquals(1, cache.backingMap.get(key).refCnt());
704    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
705    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
706
707    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
708    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
709      block1Buffer);
710    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
711    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
712    assertEquals(1, cache.backingMap.get(key).refCnt());
713
714    // Clear and add blockWithoutNextBlockMetadata
715    assertTrue(cache.evictBlock(key));
716    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
717    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
718
719    assertNull(cache.getBlock(key, false, false, false));
720    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
721      block2Buffer);
722
723    waitUntilFlushedToBucket(cache, key);
724    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
725    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
726
727    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
728    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
729      block1Buffer);
730
731    waitUntilFlushedToBucket(cache, key);
732    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
733    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
734  }
735
736  @Test
737  public void testRAMCache() {
738    int size = 100;
739    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
740    byte[] byteArr = new byte[length];
741    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
742    HFileContext meta = new HFileContextBuilder().build();
743
744    RAMCache cache = new RAMCache();
745    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
746    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
747    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
748      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
749    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
750      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
751    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
752    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
753
754    assertFalse(cache.containsKey(key1));
755    assertNull(cache.putIfAbsent(key1, re1));
756    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
757
758    assertNotNull(cache.putIfAbsent(key1, re2));
759    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
760    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
761
762    assertNull(cache.putIfAbsent(key2, re2));
763    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
764    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
765
766    cache.remove(key1);
767    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
768    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
769
770    cache.clear();
771    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
772    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
773  }
774
775  @Test
776  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
777    // initialize an block.
778    int size = 100, offset = 20;
779    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
780    ByteBuffer buf = ByteBuffer.allocate(length);
781    HFileContext meta = new HFileContextBuilder().build();
782    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
783      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
784
785    // initialize an mocked ioengine.
786    IOEngine ioEngine = Mockito.mock(IOEngine.class);
787    when(ioEngine.usesSharedMemory()).thenReturn(false);
788    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
789    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
790      Mockito.anyLong());
791    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
792      Mockito.anyLong());
793
794    // create an bucket allocator.
795    long availableSpace = 1024 * 1024 * 1024L;
796    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
797
798    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
799    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
800
801    Assert.assertEquals(0, allocator.getUsedSize());
802    try {
803      re.writeToCache(ioEngine, allocator, null, null,
804        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
805      Assert.fail();
806    } catch (Exception e) {
807    }
808    Assert.assertEquals(0, allocator.getUsedSize());
809  }
810
811  /**
812   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
813   * could not be freed even if corresponding {@link HFileBlock} is evicted from
814   * {@link BucketCache}.
815   */
816  @Test
817  public void testFreeBucketEntryRestoredFromFile() throws Exception {
818    BucketCache bucketCache = null;
819    try {
820      final Path dataTestDir = createAndGetTestDir();
821
822      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
823      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
824
825      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
826        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
827      assertTrue(bucketCache.waitForCacheInitialization(10000));
828      long usedByteSize = bucketCache.getAllocator().getUsedSize();
829      assertEquals(0, usedByteSize);
830
831      HFileBlockPair[] hfileBlockPairs =
832        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
833      // Add blocks
834      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
835        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
836      }
837
838      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
839        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
840          hfileBlockPair.getBlock(), false);
841      }
842      usedByteSize = bucketCache.getAllocator().getUsedSize();
843      assertNotEquals(0, usedByteSize);
844      // persist cache to file
845      bucketCache.shutdown();
846      assertTrue(new File(persistencePath).exists());
847
848      // restore cache from file
849      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
850        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
851      assertTrue(bucketCache.waitForCacheInitialization(10000));
852      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
853
854      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
855        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
856        bucketCache.evictBlock(blockCacheKey);
857      }
858      assertEquals(0, bucketCache.getAllocator().getUsedSize());
859      assertEquals(0, bucketCache.backingMap.size());
860    } finally {
861      bucketCache.shutdown();
862      HBASE_TESTING_UTILITY.cleanupTestDir();
863    }
864  }
865
866  @Test
867  public void testBlockAdditionWaitWhenCache() throws Exception {
868    BucketCache bucketCache = null;
869    try {
870      final Path dataTestDir = createAndGetTestDir();
871
872      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
873      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
874
875      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
876      config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000);
877
878      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
879        constructedBlockSizes, 1, 1, persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, config);
880      assertTrue(bucketCache.waitForCacheInitialization(10000));
881      long usedByteSize = bucketCache.getAllocator().getUsedSize();
882      assertEquals(0, usedByteSize);
883
884      HFileBlockPair[] hfileBlockPairs =
885        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
886      // Add blocks
887      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
888        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
889          true);
890      }
891
892      // Max wait for 10 seconds.
893      long timeout = 10000;
894      // Wait for blocks size to match the number of blocks.
895      while (bucketCache.backingMap.size() != 10) {
896        if (timeout <= 0) break;
897        Threads.sleep(100);
898        timeout -= 100;
899      }
900      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
901        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
902      }
903      usedByteSize = bucketCache.getAllocator().getUsedSize();
904      assertNotEquals(0, usedByteSize);
905      // persist cache to file
906      bucketCache.shutdown();
907      assertTrue(new File(persistencePath).exists());
908
909      // restore cache from file
910      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
911        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
912      assertTrue(bucketCache.waitForCacheInitialization(10000));
913      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
914
915      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
916        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
917        bucketCache.evictBlock(blockCacheKey);
918      }
919      assertEquals(0, bucketCache.getAllocator().getUsedSize());
920      assertEquals(0, bucketCache.backingMap.size());
921    } finally {
922      if (bucketCache != null) {
923        bucketCache.shutdown();
924      }
925      HBASE_TESTING_UTILITY.cleanupTestDir();
926    }
927  }
928
929  @Test
930  public void testOnConfigurationChange() throws Exception {
931    BucketCache bucketCache = null;
932    try {
933      final Path dataTestDir = createAndGetTestDir();
934
935      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
936
937      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
938
939      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
940        constructedBlockSizes, 1, 1, null, DEFAULT_ERROR_TOLERATION_DURATION, config);
941
942      assertTrue(bucketCache.waitForCacheInitialization(10000));
943
944      config.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
945      config.setFloat(MIN_FACTOR_CONFIG_NAME, 0.8f);
946      config.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.15f);
947      config.setFloat(SINGLE_FACTOR_CONFIG_NAME, 0.2f);
948      config.setFloat(MULTI_FACTOR_CONFIG_NAME, 0.6f);
949      config.setFloat(MEMORY_FACTOR_CONFIG_NAME, 0.2f);
950      config.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
951      config.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 500);
952      config.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 1000);
953
954      bucketCache.onConfigurationChange(config);
955
956      assertEquals(0.9f, bucketCache.getAcceptableFactor(), 0.01);
957      assertEquals(0.8f, bucketCache.getMinFactor(), 0.01);
958      assertEquals(0.15f, bucketCache.getExtraFreeFactor(), 0.01);
959      assertEquals(0.2f, bucketCache.getSingleFactor(), 0.01);
960      assertEquals(0.6f, bucketCache.getMultiFactor(), 0.01);
961      assertEquals(0.2f, bucketCache.getMemoryFactor(), 0.01);
962      assertEquals(100L, bucketCache.getQueueAdditionWaitTime());
963      assertEquals(500L, bucketCache.getBucketcachePersistInterval());
964      assertEquals(1000L, bucketCache.getPersistenceChunkSize());
965
966    } finally {
967      if (bucketCache != null) {
968        bucketCache.shutdown();
969      }
970      HBASE_TESTING_UTILITY.cleanupTestDir();
971    }
972  }
973
974  @Test
975  public void testNotifyFileCachingCompletedSuccess() throws Exception {
976    BucketCache bucketCache = null;
977    try {
978      Path filePath =
979        new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess");
980      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, false);
981      if (bucketCache.getStats().getFailedInserts() > 0) {
982        LOG.info("There were {} fail inserts, "
983          + "will assert if total blocks in backingMap equals (10 - failInserts) "
984          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
985        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
986        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
987      } else {
988        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
989      }
990    } finally {
991      if (bucketCache != null) {
992        bucketCache.shutdown();
993      }
994      HBASE_TESTING_UTILITY.cleanupTestDir();
995    }
996  }
997
998  @Test
999  public void testNotifyFileCachingCompletedForEncodedDataSuccess() throws Exception {
1000    BucketCache bucketCache = null;
1001    try {
1002      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1003        "testNotifyFileCachingCompletedForEncodedDataSuccess");
1004      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10, true);
1005      if (bucketCache.getStats().getFailedInserts() > 0) {
1006        LOG.info("There were {} fail inserts, "
1007          + "will assert if total blocks in backingMap equals (10 - failInserts) "
1008          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
1009        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
1010        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1011      } else {
1012        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1013      }
1014    } finally {
1015      if (bucketCache != null) {
1016        bucketCache.shutdown();
1017      }
1018      HBASE_TESTING_UTILITY.cleanupTestDir();
1019    }
1020  }
1021
1022  @Test
1023  public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
1024    BucketCache bucketCache = null;
1025    try {
1026      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
1027        "testNotifyFileCachingCompletedNotAllCached");
1028      // Deliberately passing more blocks than we have created to test that
1029      // notifyFileCachingCompleted will not consider the file fully cached
1030      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12, false);
1031      assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
1032    } finally {
1033      if (bucketCache != null) {
1034        bucketCache.shutdown();
1035      }
1036      HBASE_TESTING_UTILITY.cleanupTestDir();
1037    }
1038  }
1039
1040  private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath,
1041    int totalBlocksToCheck, boolean encoded) throws Exception {
1042    final Path dataTestDir = createAndGetTestDir();
1043    String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
1044    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
1045      constructedBlockSizes, 1, 1, null);
1046    assertTrue(bucketCache.waitForCacheInitialization(10000));
1047    long usedByteSize = bucketCache.getAllocator().getUsedSize();
1048    assertEquals(0, usedByteSize);
1049    HFileBlockPair[] hfileBlockPairs =
1050      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath, encoded);
1051    // Add blocks
1052    for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
1053      bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true);
1054    }
1055    bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck,
1056      totalBlocksToCheck * constructedBlockSize);
1057    return bucketCache;
1058  }
1059
1060  @Test
1061  public void testEvictOrphansOutOfGracePeriod() throws Exception {
1062    BucketCache bucketCache = testEvictOrphans(0);
1063    assertEquals(10, bucketCache.getBackingMap().size());
1064    assertEquals(0, bucketCache.blocksByHFile.stream()
1065      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count());
1066  }
1067
1068  @Test
1069  public void testEvictOrphansWithinGracePeriod() throws Exception {
1070    BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L);
1071    assertEquals(18, bucketCache.getBackingMap().size());
1072    assertTrue(bucketCache.blocksByHFile.stream()
1073      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0);
1074  }
1075
1076  private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception {
1077    Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid");
1078    Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan");
1079    Map<String, HRegion> onlineRegions = new HashMap<>();
1080    List<HStore> stores = new ArrayList<>();
1081    Collection<HStoreFile> storeFiles = new ArrayList<>();
1082    HRegion mockedRegion = mock(HRegion.class);
1083    HStore mockedStore = mock(HStore.class);
1084    HStoreFile mockedStoreFile = mock(HStoreFile.class);
1085    when(mockedStoreFile.getPath()).thenReturn(validFile);
1086    storeFiles.add(mockedStoreFile);
1087    when(mockedStore.getStorefiles()).thenReturn(storeFiles);
1088    stores.add(mockedStore);
1089    when(mockedRegion.getStores()).thenReturn(stores);
1090    onlineRegions.put("mocked_region", mockedRegion);
1091    HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
1092    HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
1093    HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
1094    HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD,
1095      orphanEvictionGracePeriod);
1096    BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21,
1097      constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000,
1098      HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
1099    HFileBlockPair[] validBlockPairs =
1100      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile);
1101    HFileBlockPair[] orphanBlockPairs =
1102      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile);
1103    for (HFileBlockPair pair : validBlockPairs) {
1104      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1105    }
1106    waitUntilAllFlushedToBucket(bucketCache);
1107    assertEquals(10, bucketCache.getBackingMap().size());
1108    bucketCache.freeSpace("test");
1109    assertEquals(10, bucketCache.getBackingMap().size());
1110    for (HFileBlockPair pair : orphanBlockPairs) {
1111      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1112    }
1113    waitUntilAllFlushedToBucket(bucketCache);
1114    assertEquals(20, bucketCache.getBackingMap().size());
1115    bucketCache.freeSpace("test");
1116    return bucketCache;
1117  }
1118}