001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotEquals; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027 028import java.io.BufferedWriter; 029import java.io.File; 030import java.io.FileOutputStream; 031import java.io.OutputStreamWriter; 032import java.nio.file.FileSystems; 033import java.nio.file.Files; 034import java.nio.file.attribute.FileTime; 035import java.time.Instant; 036import java.util.Arrays; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.Waiter; 043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 044import org.apache.hadoop.hbase.io.hfile.CacheConfig; 045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 046import org.apache.hadoop.hbase.io.hfile.Cacheable; 047import org.apache.hadoop.hbase.testclassification.SmallTests; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Pair; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.runner.RunWith; 054import org.junit.runners.Parameterized; 055 056/** 057 * Basic test for check file's integrity before start BucketCache in fileIOEngine 058 */ 059@RunWith(Parameterized.class) 060@Category(SmallTests.class) 061public class TestVerifyBucketCacheFile { 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class); 065 066 @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") 067 public static Iterable<Object[]> data() { 068 return Arrays.asList(new Object[][] { { 8192, null }, 069 { 16 * 1024, 070 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 071 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 072 128 * 1024 + 1024 } } }); 073 } 074 075 @Parameterized.Parameter(0) 076 public int constructedBlockSize; 077 078 @Parameterized.Parameter(1) 079 public int[] constructedBlockSizes; 080 081 final long capacitySize = 32 * 1024 * 1024; 082 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 083 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 084 085 /** 086 * Test cache file or persistence file does not exist whether BucketCache starts normally (1) 087 * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file. 088 * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after 089 * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file 090 * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence 091 * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the 092 * cache file and persistence file would be deleted before BucketCache start normally. 093 * @throws Exception the exception 094 */ 095 @Test 096 public void testRetrieveFromFile() throws Exception { 097 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 098 Path testDir = TEST_UTIL.getDataTestDir(); 099 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 100 101 Configuration conf = HBaseConfiguration.create(); 102 // Disables the persister thread by setting its interval to MAX_VALUE 103 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 104 105 BucketCache bucketCache = null; 106 BucketCache recoveredBucketCache = null; 107 try { 108 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 109 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 110 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 111 assertTrue(bucketCache.waitForCacheInitialization(10000)); 112 long usedSize = bucketCache.getAllocator().getUsedSize(); 113 assertEquals(0, usedSize); 114 CacheTestUtils.HFileBlockPair[] blocks = 115 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 116 // Add blocks 117 for (CacheTestUtils.HFileBlockPair block : blocks) { 118 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 119 } 120 usedSize = bucketCache.getAllocator().getUsedSize(); 121 assertNotEquals(0, usedSize); 122 // 1.persist cache to file 123 bucketCache.shutdown(); 124 // restore cache from file 125 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 126 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 127 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 128 assertTrue(bucketCache.waitForCacheInitialization(10000)); 129 waitPersistentCacheValidation(conf, bucketCache); 130 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 131 // persist cache to file 132 bucketCache.shutdown(); 133 134 // 2.delete bucket cache file 135 final java.nio.file.Path cacheFile = 136 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 137 assertTrue(Files.deleteIfExists(cacheFile)); 138 // can't restore cache from file 139 recoveredBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 140 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 141 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 142 assertTrue(recoveredBucketCache.waitForCacheInitialization(10000)); 143 waitPersistentCacheValidation(conf, recoveredBucketCache); 144 assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize()); 145 assertEquals(0, recoveredBucketCache.backingMap.size()); 146 // Add blocks 147 for (CacheTestUtils.HFileBlockPair block : blocks) { 148 cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, block.getBlockName(), 149 block.getBlock()); 150 } 151 usedSize = recoveredBucketCache.getAllocator().getUsedSize(); 152 assertNotEquals(0, usedSize); 153 // persist cache to file 154 recoveredBucketCache.shutdown(); 155 156 // 3.delete backingMap persistence file 157 final java.nio.file.Path mapFile = 158 FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence"); 159 assertTrue(Files.deleteIfExists(mapFile)); 160 // can't restore cache from file 161 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 162 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 163 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 164 assertTrue(bucketCache.waitForCacheInitialization(10000)); 165 waitPersistentCacheValidation(conf, bucketCache); 166 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 167 assertEquals(0, bucketCache.backingMap.size()); 168 } finally { 169 if (bucketCache != null) { 170 bucketCache.shutdown(); 171 } 172 if (recoveredBucketCache != null) { 173 recoveredBucketCache.shutdown(); 174 } 175 } 176 TEST_UTIL.cleanupTestDir(); 177 } 178 179 @Test 180 public void testRetrieveFromFileAfterDelete() throws Exception { 181 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 182 Path testDir = TEST_UTIL.getDataTestDir(); 183 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 184 Configuration conf = TEST_UTIL.getConfiguration(); 185 conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300); 186 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 187 BucketCache bucketCache = null; 188 try { 189 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 190 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 191 DEFAULT_ERROR_TOLERATION_DURATION, conf); 192 assertTrue(bucketCache.waitForCacheInitialization(10000)); 193 long usedSize = bucketCache.getAllocator().getUsedSize(); 194 assertEquals(0, usedSize); 195 CacheTestUtils.HFileBlockPair[] blocks = 196 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 197 // Add blocks 198 for (CacheTestUtils.HFileBlockPair block : blocks) { 199 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 200 } 201 usedSize = bucketCache.getAllocator().getUsedSize(); 202 assertNotEquals(0, usedSize); 203 // Shutdown BucketCache 204 bucketCache.shutdown(); 205 // Delete the persistence file 206 File mapFile = new File(mapFileName); 207 assertTrue(mapFile.delete()); 208 Thread.sleep(350); 209 // Create BucketCache 210 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 211 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 212 DEFAULT_ERROR_TOLERATION_DURATION, conf); 213 assertTrue(bucketCache.waitForCacheInitialization(10000)); 214 waitPersistentCacheValidation(conf, bucketCache); 215 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 216 assertEquals(0, bucketCache.backingMap.size()); 217 } finally { 218 if (bucketCache != null) { 219 bucketCache.shutdown(); 220 } 221 } 222 } 223 224 /** 225 * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache 226 * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache 227 * after modify cache file's data, and it can't restore cache from file, the cache file and 228 * persistence file would be deleted before BucketCache start normally. 229 * @throws Exception the exception 230 */ 231 @Test 232 public void testModifiedBucketCacheFileData() throws Exception { 233 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 234 Path testDir = TEST_UTIL.getDataTestDir(); 235 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 236 237 Configuration conf = HBaseConfiguration.create(); 238 // Disables the persister thread by setting its interval to MAX_VALUE 239 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 240 BucketCache bucketCache = null; 241 try { 242 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 243 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 244 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 245 assertTrue(bucketCache.waitForCacheInitialization(10000)); 246 long usedSize = bucketCache.getAllocator().getUsedSize(); 247 assertEquals(0, usedSize); 248 249 CacheTestUtils.HFileBlockPair[] blocks = 250 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 251 // Add blocks 252 for (CacheTestUtils.HFileBlockPair block : blocks) { 253 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 254 } 255 usedSize = bucketCache.getAllocator().getUsedSize(); 256 assertNotEquals(0, usedSize); 257 // persist cache to file 258 bucketCache.shutdown(); 259 260 // modified bucket cache file 261 String file = testDir + "/bucket.cache"; 262 try (BufferedWriter out = 263 new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) { 264 out.write("test bucket cache"); 265 } 266 // can't restore cache from file 267 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 268 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 269 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 270 assertTrue(bucketCache.waitForCacheInitialization(10000)); 271 waitPersistentCacheValidation(conf, bucketCache); 272 assertEquals(0, bucketCache.getAllocator().getUsedSize()); 273 assertEquals(0, bucketCache.backingMap.size()); 274 } finally { 275 if (bucketCache != null) { 276 bucketCache.shutdown(); 277 } 278 } 279 TEST_UTIL.cleanupTestDir(); 280 } 281 282 /** 283 * Test whether BucketCache is started normally after modifying the cache file's last modified 284 * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache 285 * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has 286 * modified persistence cache such that now we store extra 8 bytes at the end of each block in the 287 * cache, representing the nanosecond time the block has been cached. So in the event the cache 288 * file has failed checksum verification during loading time, we go through all the cached blocks 289 * in the cache map and validate the cached time long between what is in the map and the cache 290 * file. If that check fails, we pull the cache key entry out of the map. Since in this test we 291 * are only modifying the access time to induce a checksum error, the cache file content is still 292 * valid and the extra verification should validate that all cache keys in the map are still 293 * recoverable from the cache. 294 * @throws Exception the exception 295 */ 296 @Test 297 public void testModifiedBucketCacheFileTime() throws Exception { 298 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 299 Path testDir = TEST_UTIL.getDataTestDir(); 300 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 301 Configuration conf = HBaseConfiguration.create(); 302 // Disables the persister thread by setting its interval to MAX_VALUE 303 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 304 BucketCache bucketCache = null; 305 try { 306 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 307 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 308 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 309 assertTrue(bucketCache.waitForCacheInitialization(10000)); 310 long usedSize = bucketCache.getAllocator().getUsedSize(); 311 assertEquals(0, usedSize); 312 313 Pair<String, Long> myPair = new Pair<>(); 314 315 CacheTestUtils.HFileBlockPair[] blocks = 316 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); 317 // Add blocks 318 for (CacheTestUtils.HFileBlockPair block : blocks) { 319 cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); 320 } 321 usedSize = bucketCache.getAllocator().getUsedSize(); 322 assertNotEquals(0, usedSize); 323 long blockCount = bucketCache.backingMap.size(); 324 assertNotEquals(0, blockCount); 325 // persist cache to file 326 bucketCache.shutdown(); 327 328 // modified bucket cache file LastModifiedTime 329 final java.nio.file.Path file = 330 FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache"); 331 Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000))); 332 // can't restore cache from file 333 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 334 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 335 testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); 336 assertTrue(bucketCache.waitForCacheInitialization(10000)); 337 waitPersistentCacheValidation(conf, bucketCache); 338 assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); 339 assertEquals(blockCount, bucketCache.backingMap.size()); 340 } finally { 341 if (bucketCache != null) { 342 bucketCache.shutdown(); 343 } 344 } 345 TEST_UTIL.cleanupTestDir(); 346 } 347 348 /** 349 * When using persistent bucket cache, there may be crashes between persisting the backing map and 350 * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache 351 * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the 352 * keys that are still valid do succeed in retrieve related block data from the cache without any 353 * corruption. 354 * @throws Exception the exception 355 */ 356 @Test 357 public void testBucketCacheRecovery() throws Exception { 358 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 359 Path testDir = TEST_UTIL.getDataTestDir(); 360 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 361 Configuration conf = HBaseConfiguration.create(); 362 // Disables the persister thread by setting its interval to MAX_VALUE 363 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 364 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 365 BucketCache bucketCache = null; 366 BucketCache newBucketCache = null; 367 try { 368 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 369 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 370 DEFAULT_ERROR_TOLERATION_DURATION, conf); 371 assertTrue(bucketCache.waitForCacheInitialization(10000)); 372 373 CacheTestUtils.HFileBlockPair[] blocks = 374 CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); 375 // Add three blocks 376 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 377 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 378 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 379 // saves the current state 380 bucketCache.persistToFile(); 381 // evicts first block 382 bucketCache.evictBlock(blocks[0].getBlockName()); 383 384 // now adds a fourth block to bucket cache 385 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 386 // Creates new bucket cache instance without persisting to file after evicting first block 387 // and caching fourth block. So the bucket cache file has only the last three blocks, 388 // but backing map (containing cache keys) was persisted when first three blocks 389 // were in the cache. So the state on this recovery is: 390 // - Backing map: [block0, block1, block2] 391 // - Cache: [block1, block2, block3] 392 // Therefore, this bucket cache would be able to recover only block1 and block2. 393 newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 394 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 395 DEFAULT_ERROR_TOLERATION_DURATION, conf); 396 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 397 waitPersistentCacheValidation(conf, newBucketCache); 398 assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); 399 assertEquals(blocks[1].getBlock(), 400 newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); 401 assertEquals(blocks[2].getBlock(), 402 newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); 403 assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); 404 assertEquals(2, newBucketCache.backingMap.size()); 405 } finally { 406 if (bucketCache != null) { 407 bucketCache.shutdown(); 408 } 409 if (newBucketCache != null) { 410 newBucketCache.shutdown(); 411 } 412 } 413 TEST_UTIL.cleanupTestDir(); 414 } 415 416 @Test 417 public void testSingleChunk() throws Exception { 418 testChunkedBackingMapRecovery(5, 5); 419 } 420 421 @Test 422 public void testCompletelyFilledChunks() throws Exception { 423 // Test where the all the chunks are complete with chunkSize entries 424 testChunkedBackingMapRecovery(5, 10); 425 } 426 427 @Test 428 public void testPartiallyFilledChunks() throws Exception { 429 // Test where the last chunk is not completely filled. 430 testChunkedBackingMapRecovery(5, 13); 431 } 432 433 private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception { 434 HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 435 Path testDir = TEST_UTIL.getDataTestDir(); 436 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 437 Configuration conf = HBaseConfiguration.create(); 438 conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize); 439 440 String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); 441 BucketCache bucketCache = null; 442 BucketCache newBucketCache = null; 443 try { 444 bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 445 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 446 DEFAULT_ERROR_TOLERATION_DURATION, conf); 447 assertTrue(bucketCache.waitForCacheInitialization(10000)); 448 449 CacheTestUtils.HFileBlockPair[] blocks = 450 CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); 451 452 for (int i = 0; i < numBlocks; i++) { 453 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), 454 blocks[i].getBlock()); 455 } 456 457 // saves the current state 458 bucketCache.persistToFile(); 459 460 // Create a new bucket which reads from persistence file. 461 newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 462 constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 463 DEFAULT_ERROR_TOLERATION_DURATION, conf); 464 assertTrue(newBucketCache.waitForCacheInitialization(10000)); 465 waitPersistentCacheValidation(conf, newBucketCache); 466 assertEquals(numBlocks, newBucketCache.backingMap.size()); 467 for (int i = 0; i < numBlocks; i++) { 468 assertEquals(blocks[i].getBlock(), 469 newBucketCache.getBlock(blocks[i].getBlockName(), false, false, false)); 470 } 471 } finally { 472 if (bucketCache != null) { 473 bucketCache.shutdown(); 474 } 475 if (newBucketCache != null) { 476 newBucketCache.shutdown(); 477 } 478 } 479 TEST_UTIL.cleanupTestDir(); 480 } 481 482 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 483 throws InterruptedException { 484 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 485 Thread.sleep(100); 486 } 487 } 488 489 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 490 // threads will flush it to the bucket and put reference entry in backingMap. 491 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 492 Cacheable block) throws InterruptedException { 493 cache.cacheBlock(cacheKey, block); 494 waitUntilFlushedToBucket(cache, cacheKey); 495 } 496 497 private void waitPersistentCacheValidation(Configuration config, final BucketCache bucketCache) { 498 Waiter.waitFor(config, 5000, () -> bucketCache.getBackingMapValidated().get()); 499 } 500}