001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotEquals;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027
028import java.io.BufferedWriter;
029import java.io.File;
030import java.io.FileOutputStream;
031import java.io.OutputStreamWriter;
032import java.nio.file.FileSystems;
033import java.nio.file.Files;
034import java.nio.file.attribute.FileTime;
035import java.time.Instant;
036import java.util.Arrays;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
046import org.apache.hadoop.hbase.io.hfile.Cacheable;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Pair;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.runner.RunWith;
054import org.junit.runners.Parameterized;
055
056/**
057 * Basic test for check file's integrity before start BucketCache in fileIOEngine
058 */
059@RunWith(Parameterized.class)
060@Category(SmallTests.class)
061public class TestVerifyBucketCacheFile {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class);
065
066  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
067  public static Iterable<Object[]> data() {
068    return Arrays.asList(new Object[][] { { 8192, null },
069      { 16 * 1024,
070        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
071          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
072          128 * 1024 + 1024 } } });
073  }
074
075  @Parameterized.Parameter(0)
076  public int constructedBlockSize;
077
078  @Parameterized.Parameter(1)
079  public int[] constructedBlockSizes;
080
081  final long capacitySize = 32 * 1024 * 1024;
082  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
083  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
084
085  /**
086   * Test cache file or persistence file does not exist whether BucketCache starts normally (1)
087   * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
088   * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after
089   * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file
090   * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence
091   * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the
092   * cache file and persistence file would be deleted before BucketCache start normally.
093   * @throws Exception the exception
094   */
095  @Test
096  public void testRetrieveFromFile() throws Exception {
097    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
098    Path testDir = TEST_UTIL.getDataTestDir();
099    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
100
101    Configuration conf = HBaseConfiguration.create();
102    // Disables the persister thread by setting its interval to MAX_VALUE
103    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
104
105    BucketCache bucketCache = null;
106    BucketCache recoveredBucketCache = null;
107    try {
108      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
109        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
110        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
111      assertTrue(bucketCache.waitForCacheInitialization(10000));
112      long usedSize = bucketCache.getAllocator().getUsedSize();
113      assertEquals(0, usedSize);
114      CacheTestUtils.HFileBlockPair[] blocks =
115        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
116      // Add blocks
117      for (CacheTestUtils.HFileBlockPair block : blocks) {
118        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
119      }
120      usedSize = bucketCache.getAllocator().getUsedSize();
121      assertNotEquals(0, usedSize);
122      // 1.persist cache to file
123      bucketCache.shutdown();
124      // restore cache from file
125      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
126        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
127        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
128      assertTrue(bucketCache.waitForCacheInitialization(10000));
129      waitPersistentCacheValidation(conf, bucketCache);
130      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
131      // persist cache to file
132      bucketCache.shutdown();
133
134      // 2.delete bucket cache file
135      final java.nio.file.Path cacheFile =
136        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
137      assertTrue(Files.deleteIfExists(cacheFile));
138      // can't restore cache from file
139      recoveredBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
140        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
141        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
142      assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
143      waitPersistentCacheValidation(conf, recoveredBucketCache);
144      assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
145      assertEquals(0, recoveredBucketCache.backingMap.size());
146      // Add blocks
147      for (CacheTestUtils.HFileBlockPair block : blocks) {
148        cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, block.getBlockName(),
149          block.getBlock());
150      }
151      usedSize = recoveredBucketCache.getAllocator().getUsedSize();
152      assertNotEquals(0, usedSize);
153      // persist cache to file
154      recoveredBucketCache.shutdown();
155
156      // 3.delete backingMap persistence file
157      final java.nio.file.Path mapFile =
158        FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence");
159      assertTrue(Files.deleteIfExists(mapFile));
160      // can't restore cache from file
161      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
162        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
163        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
164      assertTrue(bucketCache.waitForCacheInitialization(10000));
165      waitPersistentCacheValidation(conf, bucketCache);
166      assertEquals(0, bucketCache.getAllocator().getUsedSize());
167      assertEquals(0, bucketCache.backingMap.size());
168    } finally {
169      if (bucketCache != null) {
170        bucketCache.shutdown();
171      }
172      if (recoveredBucketCache != null) {
173        recoveredBucketCache.shutdown();
174      }
175    }
176    TEST_UTIL.cleanupTestDir();
177  }
178
179  @Test
180  public void testRetrieveFromFileAfterDelete() throws Exception {
181    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
182    Path testDir = TEST_UTIL.getDataTestDir();
183    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
184    Configuration conf = TEST_UTIL.getConfiguration();
185    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
186    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
187    BucketCache bucketCache = null;
188    try {
189      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
190        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
191        DEFAULT_ERROR_TOLERATION_DURATION, conf);
192      assertTrue(bucketCache.waitForCacheInitialization(10000));
193      long usedSize = bucketCache.getAllocator().getUsedSize();
194      assertEquals(0, usedSize);
195      CacheTestUtils.HFileBlockPair[] blocks =
196        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
197      // Add blocks
198      for (CacheTestUtils.HFileBlockPair block : blocks) {
199        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
200      }
201      usedSize = bucketCache.getAllocator().getUsedSize();
202      assertNotEquals(0, usedSize);
203      // Shutdown BucketCache
204      bucketCache.shutdown();
205      // Delete the persistence file
206      File mapFile = new File(mapFileName);
207      assertTrue(mapFile.delete());
208      Thread.sleep(350);
209      // Create BucketCache
210      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
211        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
212        DEFAULT_ERROR_TOLERATION_DURATION, conf);
213      assertTrue(bucketCache.waitForCacheInitialization(10000));
214      waitPersistentCacheValidation(conf, bucketCache);
215      assertEquals(0, bucketCache.getAllocator().getUsedSize());
216      assertEquals(0, bucketCache.backingMap.size());
217    } finally {
218      if (bucketCache != null) {
219        bucketCache.shutdown();
220      }
221    }
222  }
223
224  /**
225   * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
226   * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
227   * after modify cache file's data, and it can't restore cache from file, the cache file and
228   * persistence file would be deleted before BucketCache start normally.
229   * @throws Exception the exception
230   */
231  @Test
232  public void testModifiedBucketCacheFileData() throws Exception {
233    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
234    Path testDir = TEST_UTIL.getDataTestDir();
235    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
236
237    Configuration conf = HBaseConfiguration.create();
238    // Disables the persister thread by setting its interval to MAX_VALUE
239    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
240    BucketCache bucketCache = null;
241    try {
242      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
243        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
244        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
245      assertTrue(bucketCache.waitForCacheInitialization(10000));
246      long usedSize = bucketCache.getAllocator().getUsedSize();
247      assertEquals(0, usedSize);
248
249      CacheTestUtils.HFileBlockPair[] blocks =
250        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
251      // Add blocks
252      for (CacheTestUtils.HFileBlockPair block : blocks) {
253        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
254      }
255      usedSize = bucketCache.getAllocator().getUsedSize();
256      assertNotEquals(0, usedSize);
257      // persist cache to file
258      bucketCache.shutdown();
259
260      // modified bucket cache file
261      String file = testDir + "/bucket.cache";
262      try (BufferedWriter out =
263        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) {
264        out.write("test bucket cache");
265      }
266      // can't restore cache from file
267      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
268        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
269        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
270      assertTrue(bucketCache.waitForCacheInitialization(10000));
271      waitPersistentCacheValidation(conf, bucketCache);
272      assertEquals(0, bucketCache.getAllocator().getUsedSize());
273      assertEquals(0, bucketCache.backingMap.size());
274    } finally {
275      if (bucketCache != null) {
276        bucketCache.shutdown();
277      }
278    }
279    TEST_UTIL.cleanupTestDir();
280  }
281
282  /**
283   * Test whether BucketCache is started normally after modifying the cache file's last modified
284   * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
285   * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has
286   * modified persistence cache such that now we store extra 8 bytes at the end of each block in the
287   * cache, representing the nanosecond time the block has been cached. So in the event the cache
288   * file has failed checksum verification during loading time, we go through all the cached blocks
289   * in the cache map and validate the cached time long between what is in the map and the cache
290   * file. If that check fails, we pull the cache key entry out of the map. Since in this test we
291   * are only modifying the access time to induce a checksum error, the cache file content is still
292   * valid and the extra verification should validate that all cache keys in the map are still
293   * recoverable from the cache.
294   * @throws Exception the exception
295   */
296  @Test
297  public void testModifiedBucketCacheFileTime() throws Exception {
298    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
299    Path testDir = TEST_UTIL.getDataTestDir();
300    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
301    Configuration conf = HBaseConfiguration.create();
302    // Disables the persister thread by setting its interval to MAX_VALUE
303    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
304    BucketCache bucketCache = null;
305    try {
306      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
307        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
308        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
309      assertTrue(bucketCache.waitForCacheInitialization(10000));
310      long usedSize = bucketCache.getAllocator().getUsedSize();
311      assertEquals(0, usedSize);
312
313      Pair<String, Long> myPair = new Pair<>();
314
315      CacheTestUtils.HFileBlockPair[] blocks =
316        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
317      // Add blocks
318      for (CacheTestUtils.HFileBlockPair block : blocks) {
319        cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
320      }
321      usedSize = bucketCache.getAllocator().getUsedSize();
322      assertNotEquals(0, usedSize);
323      long blockCount = bucketCache.backingMap.size();
324      assertNotEquals(0, blockCount);
325      // persist cache to file
326      bucketCache.shutdown();
327
328      // modified bucket cache file LastModifiedTime
329      final java.nio.file.Path file =
330        FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
331      Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000)));
332      // can't restore cache from file
333      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
334        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
335        testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
336      assertTrue(bucketCache.waitForCacheInitialization(10000));
337      waitPersistentCacheValidation(conf, bucketCache);
338      assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
339      assertEquals(blockCount, bucketCache.backingMap.size());
340    } finally {
341      if (bucketCache != null) {
342        bucketCache.shutdown();
343      }
344    }
345    TEST_UTIL.cleanupTestDir();
346  }
347
348  /**
349   * When using persistent bucket cache, there may be crashes between persisting the backing map and
350   * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache
351   * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the
352   * keys that are still valid do succeed in retrieve related block data from the cache without any
353   * corruption.
354   * @throws Exception the exception
355   */
356  @Test
357  public void testBucketCacheRecovery() throws Exception {
358    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
359    Path testDir = TEST_UTIL.getDataTestDir();
360    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
361    Configuration conf = HBaseConfiguration.create();
362    // Disables the persister thread by setting its interval to MAX_VALUE
363    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
364    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
365    BucketCache bucketCache = null;
366    BucketCache newBucketCache = null;
367    try {
368      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
369        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
370        DEFAULT_ERROR_TOLERATION_DURATION, conf);
371      assertTrue(bucketCache.waitForCacheInitialization(10000));
372
373      CacheTestUtils.HFileBlockPair[] blocks =
374        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
375      // Add three blocks
376      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
377      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
378      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
379      // saves the current state
380      bucketCache.persistToFile();
381      // evicts first block
382      bucketCache.evictBlock(blocks[0].getBlockName());
383
384      // now adds a fourth block to bucket cache
385      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
386      // Creates new bucket cache instance without persisting to file after evicting first block
387      // and caching fourth block. So the bucket cache file has only the last three blocks,
388      // but backing map (containing cache keys) was persisted when first three blocks
389      // were in the cache. So the state on this recovery is:
390      // - Backing map: [block0, block1, block2]
391      // - Cache: [block1, block2, block3]
392      // Therefore, this bucket cache would be able to recover only block1 and block2.
393      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
394        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
395        DEFAULT_ERROR_TOLERATION_DURATION, conf);
396      assertTrue(newBucketCache.waitForCacheInitialization(10000));
397      waitPersistentCacheValidation(conf, newBucketCache);
398      assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
399      assertEquals(blocks[1].getBlock(),
400        newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
401      assertEquals(blocks[2].getBlock(),
402        newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
403      assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
404      assertEquals(2, newBucketCache.backingMap.size());
405    } finally {
406      if (bucketCache != null) {
407        bucketCache.shutdown();
408      }
409      if (newBucketCache != null) {
410        newBucketCache.shutdown();
411      }
412    }
413    TEST_UTIL.cleanupTestDir();
414  }
415
416  @Test
417  public void testSingleChunk() throws Exception {
418    testChunkedBackingMapRecovery(5, 5);
419  }
420
421  @Test
422  public void testCompletelyFilledChunks() throws Exception {
423    // Test where the all the chunks are complete with chunkSize entries
424    testChunkedBackingMapRecovery(5, 10);
425  }
426
427  @Test
428  public void testPartiallyFilledChunks() throws Exception {
429    // Test where the last chunk is not completely filled.
430    testChunkedBackingMapRecovery(5, 13);
431  }
432
433  private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception {
434    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
435    Path testDir = TEST_UTIL.getDataTestDir();
436    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
437    Configuration conf = HBaseConfiguration.create();
438    conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
439
440    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
441    BucketCache bucketCache = null;
442    BucketCache newBucketCache = null;
443    try {
444      bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
445        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
446        DEFAULT_ERROR_TOLERATION_DURATION, conf);
447      assertTrue(bucketCache.waitForCacheInitialization(10000));
448
449      CacheTestUtils.HFileBlockPair[] blocks =
450        CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
451
452      for (int i = 0; i < numBlocks; i++) {
453        cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
454          blocks[i].getBlock());
455      }
456
457      // saves the current state
458      bucketCache.persistToFile();
459
460      // Create a new bucket which reads from persistence file.
461      newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
462        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
463        DEFAULT_ERROR_TOLERATION_DURATION, conf);
464      assertTrue(newBucketCache.waitForCacheInitialization(10000));
465      waitPersistentCacheValidation(conf, newBucketCache);
466      assertEquals(numBlocks, newBucketCache.backingMap.size());
467      for (int i = 0; i < numBlocks; i++) {
468        assertEquals(blocks[i].getBlock(),
469          newBucketCache.getBlock(blocks[i].getBlockName(), false, false, false));
470      }
471    } finally {
472      if (bucketCache != null) {
473        bucketCache.shutdown();
474      }
475      if (newBucketCache != null) {
476        newBucketCache.shutdown();
477      }
478    }
479    TEST_UTIL.cleanupTestDir();
480  }
481
482  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
483    throws InterruptedException {
484    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
485      Thread.sleep(100);
486    }
487  }
488
489  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
490  // threads will flush it to the bucket and put reference entry in backingMap.
491  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
492    Cacheable block) throws InterruptedException {
493    cache.cacheBlock(cacheKey, block);
494    waitUntilFlushedToBucket(cache, cacheKey);
495  }
496
497  private void waitPersistentCacheValidation(Configuration config, final BucketCache bucketCache) {
498    Waiter.waitFor(config, 5000, () -> bucketCache.getBackingMapValidated().get());
499  }
500}