001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; 024import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.Waiter; 035import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 036import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 037import org.apache.hadoop.hbase.io.hfile.Cacheable; 038import org.apache.hadoop.hbase.testclassification.SmallTests; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042 043/** 044 * Basic test for check file's integrity before start BucketCache in fileIOEngine 045 */ 046@Category(SmallTests.class) 047public class TestRecoveryPersistentBucketCache { 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestRecoveryPersistentBucketCache.class); 051 052 final long capacitySize = 32 * 1024 * 1024; 053 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 054 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 055 056 @Test 057 public void testBucketCacheRecovery() throws Exception { 058 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 059 Path testDir = TEST_UTIL.getDataTestDir(); 060 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 061 Configuration conf = HBaseConfiguration.create(); 062 // Disables the persister thread by setting its interval to MAX_VALUE 063 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 064 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 065 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 066 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 067 DEFAULT_ERROR_TOLERATION_DURATION, conf); 068 assertTrue(bucketCache.waitForCacheInitialization(1000)); 069 assertTrue( 070 bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); 071 072 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 073 074 CacheTestUtils.HFileBlockPair[] smallerBlocks = CacheTestUtils.generateHFileBlocks(4096, 1); 075 // Add four blocks 076 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 077 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 078 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 079 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 080 // saves the current state of the cache 081 bucketCache.persistToFile(); 082 // evicts the 4th block 083 bucketCache.evictBlock(blocks[3].getBlockName()); 084 // now adds a 5th block to bucket cache. This block is half the size of the previous 085 // blocks, and it will be added in the same offset of the previous evicted block. 086 // This overwrites part of the 4th block. Because we persisted only up to the 087 // 4th block addition, recovery would try to read the whole 4th block, but the cached time 088 // validation will fail, and we'll recover only the first three blocks 089 cacheAndWaitUntilFlushedToBucket(bucketCache, smallerBlocks[0].getBlockName(), 090 smallerBlocks[0].getBlock()); 091 092 // Creates new bucket cache instance without persisting to file after evicting 4th block 093 // and caching 5th block. Here the cache file has the first three blocks, followed by the 094 // 5th block and the second half of 4th block (we evicted 4th block, freeing up its 095 // offset in the cache, then added 5th block which is half the size of other blocks, so it's 096 // going to override the first half of the 4th block in the cache). That's fine because 097 // the in-memory backing map has the right blocks and related offsets. However, the 098 // persistent map file only has information about the first four blocks. We validate the 099 // cache time recorded in the back map against the block data in the cache. This is recorded 100 // in the cache as the first 8 bytes of a block, so the 4th block had its first 8 blocks 101 // now overridden by the 5th block, causing this check to fail and removal of 102 // the 4th block from the backing map. 103 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 104 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 105 DEFAULT_ERROR_TOLERATION_DURATION, conf); 106 assertTrue(newBucketCache.waitForCacheInitialization(1000)); 107 108 assertEquals(3, newBucketCache.backingMap.size()); 109 assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); 110 assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false, false, false)); 111 assertEquals(blocks[0].getBlock(), 112 newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); 113 assertEquals(blocks[1].getBlock(), 114 newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); 115 assertEquals(blocks[2].getBlock(), 116 newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); 117 TEST_UTIL.cleanupTestDir(); 118 } 119 120 @Test 121 public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { 122 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 123 Path testDir = TEST_UTIL.getDataTestDir(); 124 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 125 Configuration conf = HBaseConfiguration.create(); 126 // Disables the persister thread by setting its interval to MAX_VALUE 127 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 128 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 129 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 130 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 131 DEFAULT_ERROR_TOLERATION_DURATION, conf); 132 assertTrue(bucketCache.waitForCacheInitialization(10000)); 133 134 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 135 136 // Add four blocks 137 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 138 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 139 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 140 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 141 // saves the current state of the cache 142 bucketCache.persistToFile(); 143 144 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 145 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 146 DEFAULT_ERROR_TOLERATION_DURATION, conf); 147 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 148 assertEquals(4, newBucketCache.backingMap.size()); 149 newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName()); 150 assertEquals(3, newBucketCache.backingMap.size()); 151 TEST_UTIL.cleanupTestDir(); 152 } 153 154 @Test 155 public void testValidateCacheInitialization() throws Exception { 156 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 157 Path testDir = TEST_UTIL.getDataTestDir(); 158 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 159 Configuration conf = HBaseConfiguration.create(); 160 // Disables the persister thread by setting its interval to MAX_VALUE 161 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 162 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 163 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 164 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 165 DEFAULT_ERROR_TOLERATION_DURATION, conf); 166 assertTrue(bucketCache.waitForCacheInitialization(10000)); 167 168 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 169 170 // Add four blocks 171 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 172 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 173 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 174 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 175 // saves the current state of the cache 176 bucketCache.persistToFile(); 177 178 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 179 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 180 DEFAULT_ERROR_TOLERATION_DURATION, conf); 181 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 182 183 // Set the state of bucket cache to INITIALIZING 184 newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING); 185 186 // Validate that zero values are returned for the cache being initialized. 187 assertEquals(0, newBucketCache.acceptableSize()); 188 assertEquals(0, newBucketCache.getPartitionSize(1)); 189 assertEquals(0, newBucketCache.getFreeSize()); 190 assertEquals(0, newBucketCache.getCurrentSize()); 191 assertEquals(false, newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); 192 193 newBucketCache.setCacheState(BucketCache.CacheState.ENABLED); 194 195 // Validate that non-zero values are returned for enabled cache 196 assertTrue(newBucketCache.acceptableSize() > 0); 197 assertTrue(newBucketCache.getPartitionSize(1) > 0); 198 assertTrue(newBucketCache.getFreeSize() > 0); 199 assertTrue(newBucketCache.getCurrentSize() > 0); 200 assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); 201 202 TEST_UTIL.cleanupTestDir(); 203 } 204 205 @Test 206 public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception { 207 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 208 Path testDir = TEST_UTIL.getDataTestDir(); 209 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 210 Configuration conf = HBaseConfiguration.create(); 211 // Disables the persister thread by setting its interval to MAX_VALUE 212 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 213 conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); 214 conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); 215 conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); 216 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 217 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192, 218 bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 219 DEFAULT_ERROR_TOLERATION_DURATION, conf); 220 assertTrue(bucketCache.waitForCacheInitialization(1000)); 221 assertTrue( 222 bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); 223 224 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5); 225 226 // Add four blocks 227 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 228 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 229 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 230 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 231 232 // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it 233 // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency 234 BucketEntry bucketEntry = 235 new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(), 236 blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), 237 0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator()); 238 bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer()); 239 bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry); 240 241 // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The 242 // 5th block has same cache offset as the first 243 bucketCache.persistToFile(); 244 245 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 246 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 247 DEFAULT_ERROR_TOLERATION_DURATION, conf); 248 while (!newBucketCache.getBackingMapValidated().get()) { 249 Thread.sleep(10); 250 } 251 252 assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false)); 253 // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry 254 // or null based on different ordering of the keys in the backing map. 255 // Hence, skipping the check for that key. 256 assertEquals(blocks[1].getBlock(), 257 newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); 258 assertEquals(blocks[2].getBlock(), 259 newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); 260 assertEquals(blocks[3].getBlock(), 261 newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); 262 assertEquals(4, newBucketCache.backingMap.size()); 263 TEST_UTIL.cleanupTestDir(); 264 } 265 266 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 267 throws InterruptedException { 268 Waiter.waitFor(HBaseConfiguration.create(), 12000, 269 () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); 270 } 271 272 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 273 // threads will flush it to the bucket and put reference entry in backingMap. 274 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 275 Cacheable block) throws InterruptedException { 276 cache.cacheBlock(cacheKey, block); 277 waitUntilFlushedToBucket(cache, cacheKey); 278 } 279 280}