001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.Waiter;
035import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
036import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
037import org.apache.hadoop.hbase.io.hfile.Cacheable;
038import org.apache.hadoop.hbase.testclassification.SmallTests;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042
043/**
044 * Basic test for check file's integrity before start BucketCache in fileIOEngine
045 */
046@Category(SmallTests.class)
047public class TestRecoveryPersistentBucketCache {
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestRecoveryPersistentBucketCache.class);
051
052  final long capacitySize = 32 * 1024 * 1024;
053  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
054  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
055
056  @Test
057  public void testBucketCacheRecovery() throws Exception {
058    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
059    Path testDir = TEST_UTIL.getDataTestDir();
060    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
061    Configuration conf = HBaseConfiguration.create();
062    // Disables the persister thread by setting its interval to MAX_VALUE
063    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
064    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
065    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
066      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
067      DEFAULT_ERROR_TOLERATION_DURATION, conf);
068    assertTrue(bucketCache.waitForCacheInitialization(1000));
069    assertTrue(
070      bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());
071
072    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
073
074    CacheTestUtils.HFileBlockPair[] smallerBlocks = CacheTestUtils.generateHFileBlocks(4096, 1);
075    // Add four blocks
076    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
077    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
078    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
079    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
080    // saves the current state of the cache
081    bucketCache.persistToFile();
082    // evicts the 4th block
083    bucketCache.evictBlock(blocks[3].getBlockName());
084    // now adds a 5th block to bucket cache. This block is half the size of the previous
085    // blocks, and it will be added in the same offset of the previous evicted block.
086    // This overwrites part of the 4th block. Because we persisted only up to the
087    // 4th block addition, recovery would try to read the whole 4th block, but the cached time
088    // validation will fail, and we'll recover only the first three blocks
089    cacheAndWaitUntilFlushedToBucket(bucketCache, smallerBlocks[0].getBlockName(),
090      smallerBlocks[0].getBlock());
091
092    // Creates new bucket cache instance without persisting to file after evicting 4th block
093    // and caching 5th block. Here the cache file has the first three blocks, followed by the
094    // 5th block and the second half of 4th block (we evicted 4th block, freeing up its
095    // offset in the cache, then added 5th block which is half the size of other blocks, so it's
096    // going to override the first half of the 4th block in the cache). That's fine because
097    // the in-memory backing map has the right blocks and related offsets. However, the
098    // persistent map file only has information about the first four blocks. We validate the
099    // cache time recorded in the back map against the block data in the cache. This is recorded
100    // in the cache as the first 8 bytes of a block, so the 4th block had its first 8 blocks
101    // now overridden by the 5th block, causing this check to fail and removal of
102    // the 4th block from the backing map.
103    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
104      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
105      DEFAULT_ERROR_TOLERATION_DURATION, conf);
106    assertTrue(newBucketCache.waitForCacheInitialization(1000));
107
108    assertEquals(3, newBucketCache.backingMap.size());
109    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
110    assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false, false, false));
111    assertEquals(blocks[0].getBlock(),
112      newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
113    assertEquals(blocks[1].getBlock(),
114      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
115    assertEquals(blocks[2].getBlock(),
116      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
117    TEST_UTIL.cleanupTestDir();
118  }
119
120  @Test
121  public void testBucketCacheEvictByHFileAfterRecovery() throws Exception {
122    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
123    Path testDir = TEST_UTIL.getDataTestDir();
124    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
125    Configuration conf = HBaseConfiguration.create();
126    // Disables the persister thread by setting its interval to MAX_VALUE
127    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
128    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
129    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
130      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
131      DEFAULT_ERROR_TOLERATION_DURATION, conf);
132    assertTrue(bucketCache.waitForCacheInitialization(10000));
133
134    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
135
136    // Add four blocks
137    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
138    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
139    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
140    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
141    // saves the current state of the cache
142    bucketCache.persistToFile();
143
144    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
145      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
146      DEFAULT_ERROR_TOLERATION_DURATION, conf);
147    assertTrue(newBucketCache.waitForCacheInitialization(10000));
148    assertEquals(4, newBucketCache.backingMap.size());
149    newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName());
150    assertEquals(3, newBucketCache.backingMap.size());
151    TEST_UTIL.cleanupTestDir();
152  }
153
154  @Test
155  public void testValidateCacheInitialization() throws Exception {
156    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
157    Path testDir = TEST_UTIL.getDataTestDir();
158    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
159    Configuration conf = HBaseConfiguration.create();
160    // Disables the persister thread by setting its interval to MAX_VALUE
161    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
162    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
163    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
164      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
165      DEFAULT_ERROR_TOLERATION_DURATION, conf);
166    assertTrue(bucketCache.waitForCacheInitialization(10000));
167
168    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);
169
170    // Add four blocks
171    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
172    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
173    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
174    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
175    // saves the current state of the cache
176    bucketCache.persistToFile();
177
178    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
179      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
180      DEFAULT_ERROR_TOLERATION_DURATION, conf);
181    assertTrue(newBucketCache.waitForCacheInitialization(10000));
182
183    // Set the state of bucket cache to INITIALIZING
184    newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING);
185
186    // Validate that zero values are returned for the cache being initialized.
187    assertEquals(0, newBucketCache.acceptableSize());
188    assertEquals(0, newBucketCache.getPartitionSize(1));
189    assertEquals(0, newBucketCache.getFreeSize());
190    assertEquals(0, newBucketCache.getCurrentSize());
191    assertEquals(false, newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get());
192
193    newBucketCache.setCacheState(BucketCache.CacheState.ENABLED);
194
195    // Validate that non-zero values are returned for enabled cache
196    assertTrue(newBucketCache.acceptableSize() > 0);
197    assertTrue(newBucketCache.getPartitionSize(1) > 0);
198    assertTrue(newBucketCache.getFreeSize() > 0);
199    assertTrue(newBucketCache.getCurrentSize() > 0);
200    assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get());
201
202    TEST_UTIL.cleanupTestDir();
203  }
204
205  @Test
206  public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception {
207    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
208    Path testDir = TEST_UTIL.getDataTestDir();
209    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
210    Configuration conf = HBaseConfiguration.create();
211    // Disables the persister thread by setting its interval to MAX_VALUE
212    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
213    conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
214    conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
215    conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
216    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
217    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192,
218      bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
219      DEFAULT_ERROR_TOLERATION_DURATION, conf);
220    assertTrue(bucketCache.waitForCacheInitialization(1000));
221    assertTrue(
222      bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());
223
224    CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5);
225
226    // Add four blocks
227    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
228    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
229    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
230    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
231
232    // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it
233    // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency
234    BucketEntry bucketEntry =
235      new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(),
236        blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(),
237        0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator());
238    bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer());
239    bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry);
240
241    // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The
242    // 5th block has same cache offset as the first
243    bucketCache.persistToFile();
244
245    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024,
246      8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
247      DEFAULT_ERROR_TOLERATION_DURATION, conf);
248    while (!newBucketCache.getBackingMapValidated().get()) {
249      Thread.sleep(10);
250    }
251
252    assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false));
253    // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry
254    // or null based on different ordering of the keys in the backing map.
255    // Hence, skipping the check for that key.
256    assertEquals(blocks[1].getBlock(),
257      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
258    assertEquals(blocks[2].getBlock(),
259      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
260    assertEquals(blocks[3].getBlock(),
261      newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
262    assertEquals(4, newBucketCache.backingMap.size());
263    TEST_UTIL.cleanupTestDir();
264  }
265
266  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
267    throws InterruptedException {
268    Waiter.waitFor(HBaseConfiguration.create(), 12000,
269      () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)));
270  }
271
272  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
273  // threads will flush it to the bucket and put reference entry in backingMap.
274  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
275    Cacheable block) throws InterruptedException {
276    cache.cacheBlock(cacheKey, block);
277    waitUntilFlushedToBucket(cache, cacheKey);
278  }
279
280}