001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotEquals;
028import static org.junit.Assert.assertNotNull;
029import static org.junit.Assert.assertNull;
030import static org.junit.Assert.assertTrue;
031import static org.mockito.Mockito.mock;
032import static org.mockito.Mockito.when;
033
034import java.io.File;
035import java.io.IOException;
036import java.nio.ByteBuffer;
037import java.util.ArrayList;
038import java.util.Arrays;
039import java.util.Collection;
040import java.util.HashMap;
041import java.util.List;
042import java.util.Map;
043import java.util.Set;
044import java.util.concurrent.ThreadLocalRandom;
045import java.util.concurrent.locks.ReentrantReadWriteLock;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseConfiguration;
050import org.apache.hadoop.hbase.HBaseTestingUtil;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.Waiter;
053import org.apache.hadoop.hbase.io.ByteBuffAllocator;
054import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
055import org.apache.hadoop.hbase.io.hfile.BlockType;
056import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
057import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
058import org.apache.hadoop.hbase.io.hfile.Cacheable;
059import org.apache.hadoop.hbase.io.hfile.HFileBlock;
060import org.apache.hadoop.hbase.io.hfile.HFileContext;
061import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
062import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
063import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
064import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
065import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
066import org.apache.hadoop.hbase.nio.ByteBuff;
067import org.apache.hadoop.hbase.regionserver.HRegion;
068import org.apache.hadoop.hbase.regionserver.HStore;
069import org.apache.hadoop.hbase.regionserver.HStoreFile;
070import org.apache.hadoop.hbase.testclassification.IOTests;
071import org.apache.hadoop.hbase.testclassification.LargeTests;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.hadoop.hbase.util.Pair;
074import org.apache.hadoop.hbase.util.Threads;
075import org.junit.After;
076import org.junit.Assert;
077import org.junit.Before;
078import org.junit.ClassRule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.junit.runner.RunWith;
082import org.junit.runners.Parameterized;
083import org.mockito.Mockito;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
088
089/**
090 * Basic test of BucketCache.Puts and gets.
091 * <p>
092 * Tests will ensure that blocks' data correctness under several threads concurrency
093 */
094@RunWith(Parameterized.class)
095@Category({ IOTests.class, LargeTests.class })
096public class TestBucketCache {
097
098  private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class);
099
100  @ClassRule
101  public static final HBaseClassTestRule CLASS_RULE =
102    HBaseClassTestRule.forClass(TestBucketCache.class);
103
104  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
105  public static Iterable<Object[]> data() {
106    return Arrays.asList(new Object[][] { { 8192, null }, // TODO: why is 8k the default blocksize
107                                                          // for these tests?
108      { 16 * 1024,
109        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
110          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
111          128 * 1024 + 1024 } } });
112  }
113
114  @Parameterized.Parameter(0)
115  public int constructedBlockSize;
116
117  @Parameterized.Parameter(1)
118  public int[] constructedBlockSizes;
119
120  BucketCache cache;
121  final int CACHE_SIZE = 1000000;
122  final int NUM_BLOCKS = 100;
123  final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
124  final int NUM_THREADS = 100;
125  final int NUM_QUERIES = 10000;
126
127  final long capacitySize = 32 * 1024 * 1024;
128  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
129  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
130  private String ioEngineName = "offheap";
131
132  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
133
134  private static class MockedBucketCache extends BucketCache {
135
136    public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
137      int writerThreads, int writerQLen, String persistencePath) throws IOException {
138      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
139        persistencePath);
140    }
141
142    @Override
143    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
144      super.cacheBlock(cacheKey, buf, inMemory);
145    }
146
147    @Override
148    public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
149      super.cacheBlock(cacheKey, buf);
150    }
151  }
152
153  @Before
154  public void setup() throws IOException {
155    cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
156      constructedBlockSizes, writeThreads, writerQLen, null);
157  }
158
159  @After
160  public void tearDown() {
161    cache.shutdown();
162  }
163
164  /**
165   * Test Utility to create test dir and return name
166   * @return return name of created dir
167   * @throws IOException throws IOException
168   */
169  private Path createAndGetTestDir() throws IOException {
170    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
171    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
172    return testDir;
173  }
174
175  /**
176   * Return a random element from {@code a}.
177   */
178  private static <T> T randFrom(List<T> a) {
179    return a.get(ThreadLocalRandom.current().nextInt(a.size()));
180  }
181
182  @Test
183  public void testBucketAllocator() throws BucketAllocatorException {
184    BucketAllocator mAllocator = cache.getAllocator();
185    /*
186     * Test the allocator first
187     */
188    final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
189
190    boolean full = false;
191    ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
192    // Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
193    // the cache is completely filled.
194    List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
195    while (!full) {
196      Integer blockSize = null;
197      try {
198        blockSize = randFrom(tmp);
199        allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
200      } catch (CacheFullException cfe) {
201        tmp.remove(blockSize);
202        if (tmp.isEmpty()) full = true;
203      }
204    }
205
206    for (Integer blockSize : BLOCKSIZES) {
207      BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
208      IndexStatistics indexStatistics = bucketSizeInfo.statistics();
209      assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
210
211      // we know the block sizes above are multiples of 1024, but default bucket sizes give an
212      // additional 1024 on top of that so this counts towards fragmentation in our test
213      // real life may have worse fragmentation because blocks may not be perfectly sized to block
214      // size, given encoding/compression and large rows
215      assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
216    }
217
218    mAllocator.logDebugStatistics();
219
220    for (Pair<Long, Integer> allocation : allocations) {
221      assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
222        mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
223    }
224    assertEquals(0, mAllocator.getUsedSize());
225  }
226
227  @Test
228  public void testCacheSimple() throws Exception {
229    CacheTestUtils.testCacheSimple(cache, BLOCK_SIZE, NUM_QUERIES);
230  }
231
232  @Test
233  public void testCacheMultiThreadedSingleKey() throws Exception {
234    CacheTestUtils.hammerSingleKey(cache, 2 * NUM_THREADS, 2 * NUM_QUERIES);
235  }
236
237  @Test
238  public void testHeapSizeChanges() throws Exception {
239    cache.stopWriterThreads();
240    CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
241  }
242
243  public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
244    throws InterruptedException {
245    Waiter.waitFor(HBaseConfiguration.create(), 10000,
246      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
247  }
248
249  public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
250    while (!cache.ramCache.isEmpty()) {
251      Thread.sleep(100);
252    }
253    Thread.sleep(1000);
254  }
255
256  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
257  // threads will flush it to the bucket and put reference entry in backingMap.
258  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
259    Cacheable block, boolean waitWhenCache) throws InterruptedException {
260    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
261    waitUntilFlushedToBucket(cache, cacheKey);
262  }
263
264  @Test
265  public void testMemoryLeak() throws Exception {
266    final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
267    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
268      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
269    long lockId = cache.backingMap.get(cacheKey).offset();
270    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
271    lock.writeLock().lock();
272    Thread evictThread = new Thread("evict-block") {
273      @Override
274      public void run() {
275        cache.evictBlock(cacheKey);
276      }
277    };
278    evictThread.start();
279    cache.offsetLock.waitForWaiters(lockId, 1);
280    cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
281    assertEquals(0, cache.getBlockCount());
282    cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
283      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
284    assertEquals(1, cache.getBlockCount());
285    lock.writeLock().unlock();
286    evictThread.join();
287    /**
288     * <pre>
289     * The asserts here before HBASE-21957 are:
290     * assertEquals(1L, cache.getBlockCount());
291     * assertTrue(cache.getCurrentSize() > 0L);
292     * assertTrue("We should have a block!", cache.iterator().hasNext());
293     *
294     * The asserts here after HBASE-21957 are:
295     * assertEquals(0, cache.getBlockCount());
296     * assertEquals(cache.getCurrentSize(), 0L);
297     *
298     * I think the asserts before HBASE-21957 is more reasonable,because
299     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
300     * it had seen, and newly added Block after the {@link BucketEntry}
301     * it had seen should not be evicted.
302     * </pre>
303     */
304    assertEquals(1L, cache.getBlockCount());
305    assertTrue(cache.getCurrentSize() > 0L);
306    assertTrue("We should have a block!", cache.iterator().hasNext());
307  }
308
309  @Test
310  public void testRetrieveFromFile() throws Exception {
311    Path testDir = createAndGetTestDir();
312    String ioEngineName = "file:" + testDir + "/bucket.cache";
313    testRetrievalUtils(testDir, ioEngineName);
314    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
315    String persistencePath = testDir + "/bucket.persistence";
316    BucketCache bucketCache = null;
317    try {
318      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
319        smallBucketSizes, writeThreads, writerQLen, persistencePath);
320      assertTrue(bucketCache.waitForCacheInitialization(10000));
321      assertFalse(new File(persistencePath).exists());
322      assertEquals(0, bucketCache.getAllocator().getUsedSize());
323      assertEquals(0, bucketCache.backingMap.size());
324    } finally {
325      bucketCache.shutdown();
326      HBASE_TESTING_UTILITY.cleanupTestDir();
327    }
328  }
329
330  @Test
331  public void testRetrieveFromMMap() throws Exception {
332    final Path testDir = createAndGetTestDir();
333    final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
334    testRetrievalUtils(testDir, ioEngineName);
335  }
336
337  @Test
338  public void testRetrieveFromPMem() throws Exception {
339    final Path testDir = createAndGetTestDir();
340    final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
341    testRetrievalUtils(testDir, ioEngineName);
342    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
343    String persistencePath = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
344    BucketCache bucketCache = null;
345    try {
346      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
347        smallBucketSizes, writeThreads, writerQLen, persistencePath);
348      assertTrue(bucketCache.waitForCacheInitialization(10000));
349      assertFalse(new File(persistencePath).exists());
350      assertEquals(0, bucketCache.getAllocator().getUsedSize());
351      assertEquals(0, bucketCache.backingMap.size());
352    } finally {
353      bucketCache.shutdown();
354      HBASE_TESTING_UTILITY.cleanupTestDir();
355    }
356  }
357
358  private void testRetrievalUtils(Path testDir, String ioEngineName)
359    throws IOException, InterruptedException {
360    final String persistencePath =
361      testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
362    BucketCache bucketCache = null;
363    try {
364      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
365        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
366      assertTrue(bucketCache.waitForCacheInitialization(10000));
367      long usedSize = bucketCache.getAllocator().getUsedSize();
368      assertEquals(0, usedSize);
369      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
370      for (HFileBlockPair block : blocks) {
371        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
372      }
373      for (HFileBlockPair block : blocks) {
374        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
375          false);
376      }
377      usedSize = bucketCache.getAllocator().getUsedSize();
378      assertNotEquals(0, usedSize);
379      bucketCache.shutdown();
380      assertTrue(new File(persistencePath).exists());
381      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
382        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
383      assertTrue(bucketCache.waitForCacheInitialization(10000));
384
385      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
386    } finally {
387      if (bucketCache != null) {
388        bucketCache.shutdown();
389      }
390    }
391    assertTrue(new File(persistencePath).exists());
392  }
393
394  @Test
395  public void testRetrieveUnsupportedIOE() throws Exception {
396    try {
397      final Path testDir = createAndGetTestDir();
398      final String ioEngineName = testDir + "/bucket.cache";
399      testRetrievalUtils(testDir, ioEngineName);
400      Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
401    } catch (IllegalArgumentException e) {
402      Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, "
403        + "files:, mmap: or offheap", e.getMessage());
404    }
405  }
406
407  @Test
408  public void testRetrieveFromMultipleFiles() throws Exception {
409    final Path testDirInitial = createAndGetTestDir();
410    final Path newTestDir = new HBaseTestingUtil().getDataTestDir();
411    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
412    String ioEngineName =
413      new StringBuilder("files:").append(testDirInitial).append("/bucket1.cache")
414        .append(FileIOEngine.FILE_DELIMITER).append(newTestDir).append("/bucket2.cache").toString();
415    testRetrievalUtils(testDirInitial, ioEngineName);
416    int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
417    String persistencePath = testDirInitial + "/bucket.persistence";
418    BucketCache bucketCache = null;
419    try {
420      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
421        smallBucketSizes, writeThreads, writerQLen, persistencePath);
422      assertTrue(bucketCache.waitForCacheInitialization(10000));
423      assertFalse(new File(persistencePath).exists());
424      assertEquals(0, bucketCache.getAllocator().getUsedSize());
425      assertEquals(0, bucketCache.backingMap.size());
426    } finally {
427      bucketCache.shutdown();
428      HBASE_TESTING_UTILITY.cleanupTestDir();
429    }
430  }
431
432  @Test
433  public void testRetrieveFromFileWithoutPersistence() throws Exception {
434    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
435      constructedBlockSizes, writeThreads, writerQLen, null);
436    assertTrue(bucketCache.waitForCacheInitialization(10000));
437    try {
438      final Path testDir = createAndGetTestDir();
439      String ioEngineName = "file:" + testDir + "/bucket.cache";
440      long usedSize = bucketCache.getAllocator().getUsedSize();
441      assertEquals(0, usedSize);
442      HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
443      for (HFileBlockPair block : blocks) {
444        bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
445      }
446      for (HFileBlockPair block : blocks) {
447        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
448          false);
449      }
450      usedSize = bucketCache.getAllocator().getUsedSize();
451      assertNotEquals(0, usedSize);
452      bucketCache.shutdown();
453      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
454        constructedBlockSizes, writeThreads, writerQLen, null);
455      assertTrue(bucketCache.waitForCacheInitialization(10000));
456      assertEquals(0, bucketCache.getAllocator().getUsedSize());
457    } finally {
458      bucketCache.shutdown();
459      HBASE_TESTING_UTILITY.cleanupTestDir();
460    }
461  }
462
463  @Test
464  public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException {
465    long availableSpace = 20 * 1024L * 1024 * 1024;
466    int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 };
467    BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes);
468    assertTrue(allocator.getBuckets().length > 0);
469  }
470
471  @Test
472  public void testGetPartitionSize() throws IOException {
473    // Test default values
474    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
475      BucketCache.DEFAULT_MIN_FACTOR);
476
477    Configuration conf = HBaseConfiguration.create();
478    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
479    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
480    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
481    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
482
483    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
484      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
485    assertTrue(cache.waitForCacheInitialization(10000));
486
487    validateGetPartitionSize(cache, 0.1f, 0.5f);
488    validateGetPartitionSize(cache, 0.7f, 0.5f);
489    validateGetPartitionSize(cache, 0.2f, 0.5f);
490  }
491
492  @Test
493  public void testCacheSizeCapacity() throws IOException {
494    // Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
495    validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
496      BucketCache.DEFAULT_MIN_FACTOR);
497    Configuration conf = HBaseConfiguration.create();
498    conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
499    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
500    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
501    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
502    try {
503      new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
504        writerQLen, null, 100, conf);
505      Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
506    } catch (IllegalArgumentException e) {
507      Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
508    }
509  }
510
511  @Test
512  public void testValidBucketCacheConfigs() throws IOException {
513    Configuration conf = HBaseConfiguration.create();
514    conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
515    conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
516    conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
517    conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
518    conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
519    conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
520
521    BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
522      constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
523    assertTrue(cache.waitForCacheInitialization(10000));
524
525    assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
526      cache.getAcceptableFactor(), 0);
527    assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f, cache.getMinFactor(), 0);
528    assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
529      cache.getExtraFreeFactor(), 0);
530    assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.1f,
531      cache.getSingleFactor(), 0);
532    assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", 0.7f,
533      cache.getMultiFactor(), 0);
534    assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", 0.2f,
535      cache.getMemoryFactor(), 0);
536  }
537
538  @Test
539  public void testInvalidAcceptFactorConfig() throws IOException {
540    float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
541    boolean[] expectedOutcomes = { false, false, true, false };
542    Map<String, float[]> configMappings = ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues);
543    Configuration conf = HBaseConfiguration.create();
544    checkConfigValues(conf, configMappings, expectedOutcomes);
545  }
546
547  @Test
548  public void testInvalidMinFactorConfig() throws IOException {
549    float[] configValues = { -1f, 0f, 0.96f, 1.05f };
550    // throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
551    boolean[] expectedOutcomes = { false, true, false, false };
552    Map<String, float[]> configMappings = ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues);
553    Configuration conf = HBaseConfiguration.create();
554    checkConfigValues(conf, configMappings, expectedOutcomes);
555  }
556
557  @Test
558  public void testInvalidExtraFreeFactorConfig() throws IOException {
559    float[] configValues = { -1f, 0f, 0.2f, 1.05f };
560    // throws due to <0, in expected range, in expected range, config can be > 1.0
561    boolean[] expectedOutcomes = { false, true, true, true };
562    Map<String, float[]> configMappings =
563      ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
564    Configuration conf = HBaseConfiguration.create();
565    checkConfigValues(conf, configMappings, expectedOutcomes);
566  }
567
568  @Test
569  public void testInvalidCacheSplitFactorConfig() throws IOException {
570    float[] singleFactorConfigValues = { 0.2f, 0f, -0.2f, 1f };
571    float[] multiFactorConfigValues = { 0.4f, 0f, 1f, .05f };
572    float[] memoryFactorConfigValues = { 0.4f, 0f, 0.2f, .5f };
573    // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't
574    // be negative, configs don't add to 1.0
575    boolean[] expectedOutcomes = { true, false, false, false };
576    Map<String,
577      float[]> configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME,
578        singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues,
579        BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues);
580    Configuration conf = HBaseConfiguration.create();
581    checkConfigValues(conf, configMappings, expectedOutcomes);
582  }
583
584  private void checkConfigValues(Configuration conf, Map<String, float[]> configMap,
585    boolean[] expectSuccess) throws IOException {
586    Set<String> configNames = configMap.keySet();
587    for (int i = 0; i < expectSuccess.length; i++) {
588      try {
589        for (String configName : configNames) {
590          conf.setFloat(configName, configMap.get(configName)[i]);
591        }
592        BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
593          constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
594        assertTrue(cache.waitForCacheInitialization(10000));
595        assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i]
596          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
597      } catch (IllegalArgumentException e) {
598        assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i]
599          + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
600      }
601    }
602  }
603
604  private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor,
605    float minFactor) {
606    long expectedOutput =
607      (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor);
608    assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor));
609  }
610
611  @Test
612  public void testOffsetProducesPositiveOutput() {
613    // This number is picked because it produces negative output if the values isn't ensured to be
614    // positive. See HBASE-18757 for more information.
615    long testValue = 549888460800L;
616    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true, (entry) -> {
617      return ByteBuffAllocator.NONE;
618    }, ByteBuffAllocator.HEAP);
619    assertEquals(testValue, bucketEntry.offset());
620  }
621
622  @Test
623  public void testEvictionCount() throws InterruptedException {
624    int size = 100;
625    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
626    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
627    HFileContext meta = new HFileContextBuilder().build();
628    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
629    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
630      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
631    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
632      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
633
634    BlockCacheKey key = new BlockCacheKey("testEvictionCount", 0);
635    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
636    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
637    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
638    blockWithNextBlockMetadata.serialize(block1Buffer, true);
639    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
640
641    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
642    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
643      block1Buffer);
644
645    waitUntilFlushedToBucket(cache, key);
646
647    assertEquals(0, cache.getStats().getEvictionCount());
648
649    // evict call should return 1, but then eviction count be 0
650    assertEquals(1, cache.evictBlocksByHfileName("testEvictionCount"));
651    assertEquals(0, cache.getStats().getEvictionCount());
652
653    // add back
654    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
655      block1Buffer);
656    waitUntilFlushedToBucket(cache, key);
657
658    // should not increment
659    assertTrue(cache.evictBlock(key));
660    assertEquals(0, cache.getStats().getEvictionCount());
661
662    // add back
663    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
664      block1Buffer);
665    waitUntilFlushedToBucket(cache, key);
666
667    // should finally increment eviction count
668    cache.freeSpace("testing");
669    assertEquals(1, cache.getStats().getEvictionCount());
670  }
671
672  @Test
673  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
674    int size = 100;
675    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
676    ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
677    HFileContext meta = new HFileContextBuilder().build();
678    ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
679    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
680      ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
681    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
682      ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
683
684    BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
685    ByteBuffer actualBuffer = ByteBuffer.allocate(length);
686    ByteBuffer block1Buffer = ByteBuffer.allocate(length);
687    ByteBuffer block2Buffer = ByteBuffer.allocate(length);
688    blockWithNextBlockMetadata.serialize(block1Buffer, true);
689    blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
690
691    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
692    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
693      block1Buffer);
694
695    waitUntilFlushedToBucket(cache, key);
696    assertNotNull(cache.backingMap.get(key));
697    assertEquals(1, cache.backingMap.get(key).refCnt());
698    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
699    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
700
701    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
702    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
703      block1Buffer);
704    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
705    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
706    assertEquals(1, cache.backingMap.get(key).refCnt());
707
708    // Clear and add blockWithoutNextBlockMetadata
709    assertTrue(cache.evictBlock(key));
710    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
711    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
712
713    assertNull(cache.getBlock(key, false, false, false));
714    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
715      block2Buffer);
716
717    waitUntilFlushedToBucket(cache, key);
718    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
719    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
720
721    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
722    CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
723      block1Buffer);
724
725    waitUntilFlushedToBucket(cache, key);
726    assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
727    assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
728  }
729
730  @Test
731  public void testRAMCache() {
732    int size = 100;
733    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
734    byte[] byteArr = new byte[length];
735    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
736    HFileContext meta = new HFileContextBuilder().build();
737
738    RAMCache cache = new RAMCache();
739    BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
740    BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
741    HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
742      HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
743    HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
744      HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
745    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
746    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
747
748    assertFalse(cache.containsKey(key1));
749    assertNull(cache.putIfAbsent(key1, re1));
750    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
751
752    assertNotNull(cache.putIfAbsent(key1, re2));
753    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
754    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
755
756    assertNull(cache.putIfAbsent(key2, re2));
757    assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
758    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
759
760    cache.remove(key1);
761    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
762    assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
763
764    cache.clear();
765    assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
766    assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
767  }
768
769  @Test
770  public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {
771    // initialize an block.
772    int size = 100, offset = 20;
773    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
774    ByteBuffer buf = ByteBuffer.allocate(length);
775    HFileContext meta = new HFileContextBuilder().build();
776    HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
777      HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
778
779    // initialize an mocked ioengine.
780    IOEngine ioEngine = Mockito.mock(IOEngine.class);
781    when(ioEngine.usesSharedMemory()).thenReturn(false);
782    // Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
783    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
784      Mockito.anyLong());
785    Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuff.class),
786      Mockito.anyLong());
787
788    // create an bucket allocator.
789    long availableSpace = 1024 * 1024 * 1024L;
790    BucketAllocator allocator = new BucketAllocator(availableSpace, null);
791
792    BlockCacheKey key = new BlockCacheKey("dummy", 1L);
793    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
794
795    Assert.assertEquals(0, allocator.getUsedSize());
796    try {
797      re.writeToCache(ioEngine, allocator, null, null,
798        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
799      Assert.fail();
800    } catch (Exception e) {
801    }
802    Assert.assertEquals(0, allocator.getUsedSize());
803  }
804
805  /**
806   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
807   * could not be freed even if corresponding {@link HFileBlock} is evicted from
808   * {@link BucketCache}.
809   */
810  @Test
811  public void testFreeBucketEntryRestoredFromFile() throws Exception {
812    BucketCache bucketCache = null;
813    try {
814      final Path dataTestDir = createAndGetTestDir();
815
816      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
817      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
818
819      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
820        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
821      assertTrue(bucketCache.waitForCacheInitialization(10000));
822      long usedByteSize = bucketCache.getAllocator().getUsedSize();
823      assertEquals(0, usedByteSize);
824
825      HFileBlockPair[] hfileBlockPairs =
826        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
827      // Add blocks
828      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
829        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
830      }
831
832      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
833        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
834          hfileBlockPair.getBlock(), false);
835      }
836      usedByteSize = bucketCache.getAllocator().getUsedSize();
837      assertNotEquals(0, usedByteSize);
838      // persist cache to file
839      bucketCache.shutdown();
840      assertTrue(new File(persistencePath).exists());
841
842      // restore cache from file
843      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
844        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
845      assertTrue(bucketCache.waitForCacheInitialization(10000));
846      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
847
848      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
849        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
850        bucketCache.evictBlock(blockCacheKey);
851      }
852      assertEquals(0, bucketCache.getAllocator().getUsedSize());
853      assertEquals(0, bucketCache.backingMap.size());
854    } finally {
855      bucketCache.shutdown();
856      HBASE_TESTING_UTILITY.cleanupTestDir();
857    }
858  }
859
860  @Test
861  public void testBlockAdditionWaitWhenCache() throws Exception {
862    BucketCache bucketCache = null;
863    try {
864      final Path dataTestDir = createAndGetTestDir();
865
866      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
867      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
868
869      Configuration config = HBASE_TESTING_UTILITY.getConfiguration();
870      config.setLong(QUEUE_ADDITION_WAIT_TIME, 1000);
871
872      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
873        constructedBlockSizes, 1, 1, persistencePath);
874      assertTrue(bucketCache.waitForCacheInitialization(10000));
875      long usedByteSize = bucketCache.getAllocator().getUsedSize();
876      assertEquals(0, usedByteSize);
877
878      HFileBlockPair[] hfileBlockPairs =
879        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
880      // Add blocks
881      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
882        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
883          true);
884      }
885
886      // Max wait for 10 seconds.
887      long timeout = 10000;
888      // Wait for blocks size to match the number of blocks.
889      while (bucketCache.backingMap.size() != 10) {
890        if (timeout <= 0) break;
891        Threads.sleep(100);
892        timeout -= 100;
893      }
894      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
895        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
896      }
897      usedByteSize = bucketCache.getAllocator().getUsedSize();
898      assertNotEquals(0, usedByteSize);
899      // persist cache to file
900      bucketCache.shutdown();
901      assertTrue(new File(persistencePath).exists());
902
903      // restore cache from file
904      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
905        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
906      assertTrue(bucketCache.waitForCacheInitialization(10000));
907      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
908
909      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
910        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
911        bucketCache.evictBlock(blockCacheKey);
912      }
913      assertEquals(0, bucketCache.getAllocator().getUsedSize());
914      assertEquals(0, bucketCache.backingMap.size());
915    } finally {
916      if (bucketCache != null) {
917        bucketCache.shutdown();
918      }
919      HBASE_TESTING_UTILITY.cleanupTestDir();
920    }
921  }
922
923  @Test
924  public void testNotifyFileCachingCompletedSuccess() throws Exception {
925    BucketCache bucketCache = null;
926    try {
927      Path filePath =
928        new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess");
929      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10);
930      if (bucketCache.getStats().getFailedInserts() > 0) {
931        LOG.info("There were {} fail inserts, "
932          + "will assert if total blocks in backingMap equals (10 - failInserts) "
933          + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
934        assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
935        assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
936      } else {
937        assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
938      }
939    } finally {
940      if (bucketCache != null) {
941        bucketCache.shutdown();
942      }
943      HBASE_TESTING_UTILITY.cleanupTestDir();
944    }
945  }
946
947  @Test
948  public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
949    BucketCache bucketCache = null;
950    try {
951      Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
952        "testNotifyFileCachingCompletedNotAllCached");
953      // Deliberately passing more blocks than we have created to test that
954      // notifyFileCachingCompleted will not consider the file fully cached
955      bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12);
956      assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
957    } finally {
958      if (bucketCache != null) {
959        bucketCache.shutdown();
960      }
961      HBASE_TESTING_UTILITY.cleanupTestDir();
962    }
963  }
964
965  private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath,
966    int totalBlocksToCheck) throws Exception {
967    final Path dataTestDir = createAndGetTestDir();
968    String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
969    BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
970      constructedBlockSizes, 1, 1, null);
971    assertTrue(bucketCache.waitForCacheInitialization(10000));
972    long usedByteSize = bucketCache.getAllocator().getUsedSize();
973    assertEquals(0, usedByteSize);
974    HFileBlockPair[] hfileBlockPairs =
975      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, filePath);
976    // Add blocks
977    for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
978      bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true);
979    }
980    bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck,
981      totalBlocksToCheck * constructedBlockSize);
982    return bucketCache;
983  }
984
985  @Test
986  public void testEvictOrphansOutOfGracePeriod() throws Exception {
987    BucketCache bucketCache = testEvictOrphans(0);
988    assertEquals(10, bucketCache.getBackingMap().size());
989    assertEquals(0, bucketCache.blocksByHFile.stream()
990      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count());
991  }
992
993  @Test
994  public void testEvictOrphansWithinGracePeriod() throws Exception {
995    BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L);
996    assertEquals(18, bucketCache.getBackingMap().size());
997    assertTrue(bucketCache.blocksByHFile.stream()
998      .filter(key -> key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0);
999  }
1000
1001  private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws Exception {
1002    Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-valid");
1003    Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testEvictOrphans-orphan");
1004    Map<String, HRegion> onlineRegions = new HashMap<>();
1005    List<HStore> stores = new ArrayList<>();
1006    Collection<HStoreFile> storeFiles = new ArrayList<>();
1007    HRegion mockedRegion = mock(HRegion.class);
1008    HStore mockedStore = mock(HStore.class);
1009    HStoreFile mockedStoreFile = mock(HStoreFile.class);
1010    when(mockedStoreFile.getPath()).thenReturn(validFile);
1011    storeFiles.add(mockedStoreFile);
1012    when(mockedStore.getStorefiles()).thenReturn(storeFiles);
1013    stores.add(mockedStore);
1014    when(mockedRegion.getStores()).thenReturn(stores);
1015    onlineRegions.put("mocked_region", mockedRegion);
1016    HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
1017    HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
1018    HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
1019    HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD,
1020      orphanEvictionGracePeriod);
1021    BucketCache bucketCache = new BucketCache(ioEngineName, (constructedBlockSize + 1024) * 21,
1022      constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1, null, 60 * 1000,
1023      HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
1024    HFileBlockPair[] validBlockPairs =
1025      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, validFile);
1026    HFileBlockPair[] orphanBlockPairs =
1027      CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10, orphanFile);
1028    for (HFileBlockPair pair : validBlockPairs) {
1029      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1030    }
1031    waitUntilAllFlushedToBucket(bucketCache);
1032    assertEquals(10, bucketCache.getBackingMap().size());
1033    bucketCache.freeSpace("test");
1034    assertEquals(10, bucketCache.getBackingMap().size());
1035    for (HFileBlockPair pair : orphanBlockPairs) {
1036      bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(), false, true);
1037    }
1038    waitUntilAllFlushedToBucket(bucketCache);
1039    assertEquals(20, bucketCache.getBackingMap().size());
1040    bucketCache.freeSpace("test");
1041    return bucketCache;
1042  }
1043}