001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotEquals; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026 027import java.io.BufferedWriter; 028import java.io.File; 029import java.io.FileOutputStream; 030import java.io.OutputStreamWriter; 031import java.nio.file.FileSystems; 032import java.nio.file.Files; 033import java.nio.file.attribute.FileTime; 034import java.time.Instant; 035import java.util.Arrays; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 042import org.apache.hadoop.hbase.io.hfile.CacheConfig; 043import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 044import org.apache.hadoop.hbase.io.hfile.Cacheable; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.Pair; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.runner.RunWith; 052import org.junit.runners.Parameterized; 053 054/** 055 * Basic test for check file's integrity before start BucketCache in fileIOEngine 056 */ 057@RunWith(Parameterized.class) 058@Category(SmallTests.class) 059public class TestVerifyBucketCacheFile { 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class); 063 064 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 065 public static Iterable<Object[]> data() { 066 return Arrays.asList(new Object[][] { { 8192, null }, 067 { 16 * 1024, 068 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 069 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 070 128 * 1024 + 1024 } } }); 071 } 072 073 @Parameterized.Parameter(0) 074 public int constructedBlockSize; 075 076 @Parameterized.Parameter(1) 077 public int[] constructedBlockSizes; 078 079 final long capacitySize = 32 * 1024 * 1024; 080 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 081 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 082 083 /** 084 * Test cache file or persistence file does not exist whether BucketCache starts normally (1) 085 * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. 086 * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after 087 * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file 088 * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence 089 * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the 090 * cache file and persistence file would be deleted before BucketCache start normally. 091 * @throws Exception the exception 092 */ 093 @Test 094 public void testRetrieveFromFile() throws Exception { 095 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 096 Path testDir = TEST_UTIL.getDataTestDir(); 097 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 098 099 BucketCache bucketCache = 100 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 101 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 102 long usedSize = bucketCache.getAllocator().getUsedSize(); 103 assertEquals(0, usedSize); 104 CacheTestUtils.HFileBlockPair[] blocks = 105 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 106 // Add blocks 107 for (CacheTestUtils.HFileBlockPair block : blocks) { 108 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 109 } 110 usedSize = bucketCache.getAllocator().getUsedSize(); 111 assertNotEquals(0, usedSize); 112 // 1.persist cache to file 113 bucketCache.shutdown(); 114 // restore cache from file 115 bucketCache = 116 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 117 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 118 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 119 // persist cache to file 120 bucketCache.shutdown(); 121 122 // 2.delete bucket cache file 123 final java.nio.file.Path cacheFile = 124 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 125 assertTrue(Files.deleteIfExists(cacheFile)); 126 // can't restore cache from file 127 bucketCache = 128 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 129 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 130 Thread.sleep(100); 131 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 132 assertEquals(0, bucketCache.backingMap.size()); 133 // Add blocks 134 for (CacheTestUtils.HFileBlockPair block : blocks) { 135 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 136 } 137 usedSize = bucketCache.getAllocator().getUsedSize(); 138 assertNotEquals(0, usedSize); 139 // persist cache to file 140 bucketCache.shutdown(); 141 142 // 3.delete backingMap persistence file 143 final java.nio.file.Path mapFile = 144 FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); 145 assertTrue(Files.deleteIfExists(mapFile)); 146 // can't restore cache from file 147 bucketCache = 148 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 149 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 150 Thread.sleep(100); 151 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 152 assertEquals(0, bucketCache.backingMap.size()); 153 154 TEST_UTIL.cleanupTestDir(); 155 } 156 157 @Test 158 public void testRetrieveFromFileAfterDelete() throws Exception { 159 160 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 161 Path testDir = TEST_UTIL.getDataTestDir(); 162 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 163 Configuration conf = TEST_UTIL.getConfiguration(); 164 conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); 165 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 166 BucketCache bucketCache = 167 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 168 constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); 169 170 long usedSize = bucketCache.getAllocator().getUsedSize(); 171 assertEquals(0, usedSize); 172 CacheTestUtils.HFileBlockPair[] blocks = 173 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 174 // Add blocks 175 for (CacheTestUtils.HFileBlockPair block : blocks) { 176 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 177 } 178 usedSize = bucketCache.getAllocator().getUsedSize(); 179 assertNotEquals(0, usedSize); 180 // Shutdown BucketCache 181 bucketCache.shutdown(); 182 // Delete the persistence file 183 File mapFile = new File(mapFileName); 184 assertTrue(mapFile.delete()); 185 Thread.sleep(350); 186 // Create BucketCache 187 bucketCache = 188 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 189 constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); 190 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 191 assertEquals(0, bucketCache.backingMap.size()); 192 } 193 194 /** 195 * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache 196 * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache 197 * after modify cache file's data, and it can't restore cache from file, the cache file and 198 * persistence file would be deleted before BucketCache start normally. 199 * @throws Exception the exception 200 */ 201 @Test 202 public void testModifiedBucketCacheFileData() throws Exception { 203 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 204 Path testDir = TEST_UTIL.getDataTestDir(); 205 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 206 207 Configuration conf = HBaseConfiguration.create(); 208 // Disables the persister thread by setting its interval to MAX_VALUE 209 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 210 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 211 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 212 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 213 long usedSize = bucketCache.getAllocator().getUsedSize(); 214 assertEquals(0, usedSize); 215 216 CacheTestUtils.HFileBlockPair[] blocks = 217 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 218 // Add blocks 219 for (CacheTestUtils.HFileBlockPair block : blocks) { 220 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 221 } 222 usedSize = bucketCache.getAllocator().getUsedSize(); 223 assertNotEquals(0, usedSize); 224 // persist cache to file 225 bucketCache.shutdown(); 226 227 // modified bucket cache file 228 String file = testDir + "/bucket.cache"; 229 try (BufferedWriter out = 230 new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) { 231 out.write("test bucket cache"); 232 } 233 // can't restore cache from file 234 bucketCache = 235 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 236 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 237 Thread.sleep(100); 238 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 239 assertEquals(0, bucketCache.backingMap.size()); 240 241 TEST_UTIL.cleanupTestDir(); 242 } 243 244 /** 245 * Test whether BucketCache is started normally after modifying the cache file's last modified 246 * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache 247 * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has 248 * modified persistence cache such that now we store extra 8 bytes at the end of each block in the 249 * cache, representing the nanosecond time the block has been cached. So in the event the cache 250 * file has failed checksum verification during loading time, we go through all the cached blocks 251 * in the cache map and validate the cached time long between what is in the map and the cache 252 * file. If that check fails, we pull the cache key entry out of the map. Since in this test we 253 * are only modifying the access time to induce a checksum error, the cache file content is still 254 * valid and the extra verification should validate that all cache keys in the map are still 255 * recoverable from the cache. 256 * @throws Exception the exception 257 */ 258 @Test 259 public void testModifiedBucketCacheFileTime() throws Exception { 260 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 261 Path testDir = TEST_UTIL.getDataTestDir(); 262 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 263 264 BucketCache bucketCache = 265 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 266 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 267 long usedSize = bucketCache.getAllocator().getUsedSize(); 268 assertEquals(0, usedSize); 269 270 Pair<String, Long> myPair = new Pair<>(); 271 272 CacheTestUtils.HFileBlockPair[] blocks = 273 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 274 // Add blocks 275 for (CacheTestUtils.HFileBlockPair block : blocks) { 276 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 277 } 278 usedSize = bucketCache.getAllocator().getUsedSize(); 279 assertNotEquals(0, usedSize); 280 long blockCount = bucketCache.backingMap.size(); 281 assertNotEquals(0, blockCount); 282 // persist cache to file 283 bucketCache.shutdown(); 284 285 // modified bucket cache file LastModifiedTime 286 final java.nio.file.Path file = 287 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 288 Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000))); 289 // can't restore cache from file 290 bucketCache = 291 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 292 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 293 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 294 assertEquals(blockCount, bucketCache.backingMap.size()); 295 296 TEST_UTIL.cleanupTestDir(); 297 } 298 299 /** 300 * When using persistent bucket cache, there may be crashes between persisting the backing map and 301 * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache 302 * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the 303 * keys that are still valid do succeed in retrieve related block data from the cache without any 304 * corruption. 305 * @throws Exception the exception 306 */ 307 @Test 308 public void testBucketCacheRecovery() throws Exception { 309 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 310 Path testDir = TEST_UTIL.getDataTestDir(); 311 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 312 Configuration conf = HBaseConfiguration.create(); 313 // Disables the persister thread by setting its interval to MAX_VALUE 314 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 315 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 316 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 317 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 318 DEFAULT_ERROR_TOLERATION_DURATION, conf); 319 320 CacheTestUtils.HFileBlockPair[] blocks = 321 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); 322 // Add three blocks 323 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 324 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 325 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 326 // saves the current state 327 bucketCache.persistToFile(); 328 // evicts first block 329 bucketCache.evictBlock(blocks[0].getBlockName()); 330 331 // now adds a fourth block to bucket cache 332 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 333 // Creates new bucket cache instance without persisting to file after evicting first block 334 // and caching fourth block. So the bucket cache file has only the last three blocks, 335 // but backing map (containing cache keys) was persisted when first three blocks 336 // were in the cache. So the state on this recovery is: 337 // - Backing map: [block0, block1, block2] 338 // - Cache: [block1, block2, block3] 339 // Therefore, this bucket cache would be able to recover only block1 and block2. 340 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 341 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 342 DEFAULT_ERROR_TOLERATION_DURATION, conf); 343 344 assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); 345 assertEquals(blocks[1].getBlock(), 346 newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); 347 assertEquals(blocks[2].getBlock(), 348 newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); 349 assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); 350 assertEquals(2, newBucketCache.backingMap.size()); 351 TEST_UTIL.cleanupTestDir(); 352 } 353 354 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 355 throws InterruptedException { 356 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 357 Thread.sleep(100); 358 } 359 } 360 361 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 362 // threads will flush it to the bucket and put reference entry in backingMap. 363 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 364 Cacheable block) throws InterruptedException { 365 cache.cacheBlock(cacheKey, block); 366 waitUntilFlushedToBucket(cache, cacheKey); 367 } 368}