001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotEquals; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027 028import java.io.BufferedWriter; 029import java.io.File; 030import java.io.FileOutputStream; 031import java.io.OutputStreamWriter; 032import java.nio.file.FileSystems; 033import java.nio.file.Files; 034import java.nio.file.attribute.FileTime; 035import java.time.Instant; 036import java.util.Arrays; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 046import org.apache.hadoop.hbase.io.hfile.Cacheable; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Pair; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.runner.RunWith; 054import org.junit.runners.Parameterized; 055 056/** 057 * Basic test for check file's integrity before start BucketCache in fileIOEngine 058 */ 059@RunWith(Parameterized.class) 060@Category(SmallTests.class) 061public class TestVerifyBucketCacheFile { 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class); 065 066 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 067 public static Iterable<Object[]> data() { 068 return Arrays.asList(new Object[][] { { 8192, null }, 069 { 16 * 1024, 070 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 071 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 072 128 * 1024 + 1024 } } }); 073 } 074 075 @Parameterized.Parameter(0) 076 public int constructedBlockSize; 077 078 @Parameterized.Parameter(1) 079 public int[] constructedBlockSizes; 080 081 final long capacitySize = 32 * 1024 * 1024; 082 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 083 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 084 085 /** 086 * Test cache file or persistence file does not exist whether BucketCache starts normally (1) 087 * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. 088 * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after 089 * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file 090 * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence 091 * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the 092 * cache file and persistence file would be deleted before BucketCache start normally. 093 * @throws Exception the exception 094 */ 095 @Test 096 public void testRetrieveFromFile() throws Exception { 097 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 098 Path testDir = TEST_UTIL.getDataTestDir(); 099 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 100 101 BucketCache bucketCache = 102 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 103 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 104 assertTrue(bucketCache.waitForCacheInitialization(10000)); 105 long usedSize = bucketCache.getAllocator().getUsedSize(); 106 assertEquals(0, usedSize); 107 CacheTestUtils.HFileBlockPair[] blocks = 108 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 109 // Add blocks 110 for (CacheTestUtils.HFileBlockPair block : blocks) { 111 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 112 } 113 usedSize = bucketCache.getAllocator().getUsedSize(); 114 assertNotEquals(0, usedSize); 115 // 1.persist cache to file 116 bucketCache.shutdown(); 117 // restore cache from file 118 bucketCache = 119 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 120 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 121 assertTrue(bucketCache.waitForCacheInitialization(10000)); 122 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 123 // persist cache to file 124 bucketCache.shutdown(); 125 126 // 2.delete bucket cache file 127 final java.nio.file.Path cacheFile = 128 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 129 assertTrue(Files.deleteIfExists(cacheFile)); 130 // can't restore cache from file 131 final BucketCache recoveredBucketCache = 132 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 133 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 134 assertTrue(recoveredBucketCache.waitForCacheInitialization(10000)); 135 Waiter.waitFor(HBaseConfiguration.create(), 1000, 136 () -> recoveredBucketCache.getBackingMapValidated().get()); 137 assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize()); 138 assertEquals(0, recoveredBucketCache.backingMap.size()); 139 // Add blocks 140 for (CacheTestUtils.HFileBlockPair block : blocks) { 141 cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, block.getBlockName(), 142 block.getBlock()); 143 } 144 usedSize = recoveredBucketCache.getAllocator().getUsedSize(); 145 assertNotEquals(0, usedSize); 146 // persist cache to file 147 recoveredBucketCache.shutdown(); 148 149 // 3.delete backingMap persistence file 150 final java.nio.file.Path mapFile = 151 FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); 152 assertTrue(Files.deleteIfExists(mapFile)); 153 // can't restore cache from file 154 bucketCache = 155 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 156 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 157 assertTrue(bucketCache.waitForCacheInitialization(10000)); 158 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 159 assertEquals(0, bucketCache.backingMap.size()); 160 161 TEST_UTIL.cleanupTestDir(); 162 } 163 164 @Test 165 public void testRetrieveFromFileAfterDelete() throws Exception { 166 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 167 Path testDir = TEST_UTIL.getDataTestDir(); 168 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 169 Configuration conf = TEST_UTIL.getConfiguration(); 170 conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); 171 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 172 BucketCache bucketCache = 173 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 174 constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); 175 assertTrue(bucketCache.waitForCacheInitialization(10000)); 176 177 long usedSize = bucketCache.getAllocator().getUsedSize(); 178 assertEquals(0, usedSize); 179 CacheTestUtils.HFileBlockPair[] blocks = 180 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 181 // Add blocks 182 for (CacheTestUtils.HFileBlockPair block : blocks) { 183 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 184 } 185 usedSize = bucketCache.getAllocator().getUsedSize(); 186 assertNotEquals(0, usedSize); 187 // Shutdown BucketCache 188 bucketCache.shutdown(); 189 // Delete the persistence file 190 File mapFile = new File(mapFileName); 191 assertTrue(mapFile.delete()); 192 Thread.sleep(350); 193 // Create BucketCache 194 bucketCache = 195 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 196 constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); 197 assertTrue(bucketCache.waitForCacheInitialization(10000)); 198 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 199 assertEquals(0, bucketCache.backingMap.size()); 200 } 201 202 /** 203 * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache 204 * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache 205 * after modify cache file's data, and it can't restore cache from file, the cache file and 206 * persistence file would be deleted before BucketCache start normally. 207 * @throws Exception the exception 208 */ 209 @Test 210 public void testModifiedBucketCacheFileData() throws Exception { 211 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 212 Path testDir = TEST_UTIL.getDataTestDir(); 213 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 214 215 Configuration conf = HBaseConfiguration.create(); 216 // Disables the persister thread by setting its interval to MAX_VALUE 217 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 218 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 219 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 220 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 221 assertTrue(bucketCache.waitForCacheInitialization(10000)); 222 long usedSize = bucketCache.getAllocator().getUsedSize(); 223 assertEquals(0, usedSize); 224 225 CacheTestUtils.HFileBlockPair[] blocks = 226 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 227 // Add blocks 228 for (CacheTestUtils.HFileBlockPair block : blocks) { 229 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 230 } 231 usedSize = bucketCache.getAllocator().getUsedSize(); 232 assertNotEquals(0, usedSize); 233 // persist cache to file 234 bucketCache.shutdown(); 235 236 // modified bucket cache file 237 String file = testDir + "/bucket.cache"; 238 try (BufferedWriter out = 239 new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) { 240 out.write("test bucket cache"); 241 } 242 // can't restore cache from file 243 bucketCache = 244 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 245 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 246 assertTrue(bucketCache.waitForCacheInitialization(10000)); 247 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 248 assertEquals(0, bucketCache.backingMap.size()); 249 250 TEST_UTIL.cleanupTestDir(); 251 } 252 253 /** 254 * Test whether BucketCache is started normally after modifying the cache file's last modified 255 * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache 256 * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has 257 * modified persistence cache such that now we store extra 8 bytes at the end of each block in the 258 * cache, representing the nanosecond time the block has been cached. So in the event the cache 259 * file has failed checksum verification during loading time, we go through all the cached blocks 260 * in the cache map and validate the cached time long between what is in the map and the cache 261 * file. If that check fails, we pull the cache key entry out of the map. Since in this test we 262 * are only modifying the access time to induce a checksum error, the cache file content is still 263 * valid and the extra verification should validate that all cache keys in the map are still 264 * recoverable from the cache. 265 * @throws Exception the exception 266 */ 267 @Test 268 public void testModifiedBucketCacheFileTime() throws Exception { 269 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 270 Path testDir = TEST_UTIL.getDataTestDir(); 271 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 272 273 BucketCache bucketCache = 274 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 275 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 276 assertTrue(bucketCache.waitForCacheInitialization(10000)); 277 long usedSize = bucketCache.getAllocator().getUsedSize(); 278 assertEquals(0, usedSize); 279 280 Pair<String, Long> myPair = new Pair<>(); 281 282 CacheTestUtils.HFileBlockPair[] blocks = 283 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 284 // Add blocks 285 for (CacheTestUtils.HFileBlockPair block : blocks) { 286 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 287 } 288 usedSize = bucketCache.getAllocator().getUsedSize(); 289 assertNotEquals(0, usedSize); 290 long blockCount = bucketCache.backingMap.size(); 291 assertNotEquals(0, blockCount); 292 // persist cache to file 293 bucketCache.shutdown(); 294 295 // modified bucket cache file LastModifiedTime 296 final java.nio.file.Path file = 297 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 298 Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000))); 299 // can't restore cache from file 300 bucketCache = 301 new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, 302 constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); 303 assertTrue(bucketCache.waitForCacheInitialization(10000)); 304 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 305 assertEquals(blockCount, bucketCache.backingMap.size()); 306 307 TEST_UTIL.cleanupTestDir(); 308 } 309 310 /** 311 * When using persistent bucket cache, there may be crashes between persisting the backing map and 312 * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache 313 * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the 314 * keys that are still valid do succeed in retrieve related block data from the cache without any 315 * corruption. 316 * @throws Exception the exception 317 */ 318 @Test 319 public void testBucketCacheRecovery() throws Exception { 320 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 321 Path testDir = TEST_UTIL.getDataTestDir(); 322 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 323 Configuration conf = HBaseConfiguration.create(); 324 // Disables the persister thread by setting its interval to MAX_VALUE 325 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 326 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 327 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 328 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 329 DEFAULT_ERROR_TOLERATION_DURATION, conf); 330 assertTrue(bucketCache.waitForCacheInitialization(10000)); 331 332 CacheTestUtils.HFileBlockPair[] blocks = 333 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); 334 // Add three blocks 335 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 336 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 337 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 338 // saves the current state 339 bucketCache.persistToFile(); 340 // evicts first block 341 bucketCache.evictBlock(blocks[0].getBlockName()); 342 343 // now adds a fourth block to bucket cache 344 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 345 // Creates new bucket cache instance without persisting to file after evicting first block 346 // and caching fourth block. So the bucket cache file has only the last three blocks, 347 // but backing map (containing cache keys) was persisted when first three blocks 348 // were in the cache. So the state on this recovery is: 349 // - Backing map: [block0, block1, block2] 350 // - Cache: [block1, block2, block3] 351 // Therefore, this bucket cache would be able to recover only block1 and block2. 352 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 353 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 354 DEFAULT_ERROR_TOLERATION_DURATION, conf); 355 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 356 357 assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); 358 assertEquals(blocks[1].getBlock(), 359 newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); 360 assertEquals(blocks[2].getBlock(), 361 newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); 362 assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); 363 assertEquals(2, newBucketCache.backingMap.size()); 364 TEST_UTIL.cleanupTestDir(); 365 } 366 367 @Test 368 public void testSingleChunk() throws Exception { 369 testChunkedBackingMapRecovery(5, 5); 370 } 371 372 @Test 373 public void testCompletelyFilledChunks() throws Exception { 374 // Test where the all the chunks are complete with chunkSize entries 375 testChunkedBackingMapRecovery(5, 10); 376 } 377 378 @Test 379 public void testPartiallyFilledChunks() throws Exception { 380 // Test where the last chunk is not completely filled. 381 testChunkedBackingMapRecovery(5, 13); 382 } 383 384 private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception { 385 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 386 Path testDir = TEST_UTIL.getDataTestDir(); 387 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 388 Configuration conf = HBaseConfiguration.create(); 389 conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize); 390 391 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 392 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 393 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 394 DEFAULT_ERROR_TOLERATION_DURATION, conf); 395 assertTrue(bucketCache.waitForCacheInitialization(10000)); 396 397 CacheTestUtils.HFileBlockPair[] blocks = 398 CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); 399 400 for (int i = 0; i < numBlocks; i++) { 401 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), blocks[i].getBlock()); 402 } 403 404 // saves the current state 405 bucketCache.persistToFile(); 406 407 // Create a new bucket which reads from persistence file. 408 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 409 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 410 DEFAULT_ERROR_TOLERATION_DURATION, conf); 411 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 412 assertEquals(numBlocks, newBucketCache.backingMap.size()); 413 414 for (int i = 0; i < numBlocks; i++) { 415 assertEquals(blocks[i].getBlock(), 416 newBucketCache.getBlock(blocks[i].getBlockName(), false, false, false)); 417 } 418 TEST_UTIL.cleanupTestDir(); 419 } 420 421 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 422 throws InterruptedException { 423 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 424 Thread.sleep(100); 425 } 426 } 427 428 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 429 // threads will flush it to the bucket and put reference entry in backingMap. 430 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 431 Cacheable block) throws InterruptedException { 432 cache.cacheBlock(cacheKey, block); 433 waitUntilFlushedToBucket(cache, cacheKey); 434 } 435}