001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
022import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotEquals;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027
028import java.io.BufferedWriter;
029import java.io.File;
030import java.io.FileOutputStream;
031import java.io.OutputStreamWriter;
032import java.nio.file.FileSystems;
033import java.nio.file.Files;
034import java.nio.file.attribute.FileTime;
035import java.time.Instant;
036import java.util.Arrays;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
044import org.apache.hadoop.hbase.io.hfile.CacheConfig;
045import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
046import org.apache.hadoop.hbase.io.hfile.Cacheable;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.Pair;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.runner.RunWith;
054import org.junit.runners.Parameterized;
055
056/**
057 * Basic test for check file's integrity before start BucketCache in fileIOEngine
058 */
059@RunWith(Parameterized.class)
060@Category(SmallTests.class)
061public class TestVerifyBucketCacheFile {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class);
065
066  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
067  public static Iterable<Object[]> data() {
068    return Arrays.asList(new Object[][] { { 8192, null },
069      { 16 * 1024,
070        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
071          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
072          128 * 1024 + 1024 } } });
073  }
074
075  @Parameterized.Parameter(0)
076  public int constructedBlockSize;
077
078  @Parameterized.Parameter(1)
079  public int[] constructedBlockSizes;
080
081  final long capacitySize = 32 * 1024 * 1024;
082  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
083  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
084
085  /**
086   * Test cache file or persistence file does not exist whether BucketCache starts normally (1)
087   * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
088   * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after
089   * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file
090   * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence
091   * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the
092   * cache file and persistence file would be deleted before BucketCache start normally.
093   * @throws Exception the exception
094   */
095  @Test
096  public void testRetrieveFromFile() throws Exception {
097    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
098    Path testDir = TEST_UTIL.getDataTestDir();
099    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
100
101    BucketCache bucketCache =
102      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
103        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
104    assertTrue(bucketCache.waitForCacheInitialization(10000));
105    long usedSize = bucketCache.getAllocator().getUsedSize();
106    assertEquals(0, usedSize);
107    CacheTestUtils.HFileBlockPair[] blocks =
108      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
109    // Add blocks
110    for (CacheTestUtils.HFileBlockPair block : blocks) {
111      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
112    }
113    usedSize = bucketCache.getAllocator().getUsedSize();
114    assertNotEquals(0, usedSize);
115    // 1.persist cache to file
116    bucketCache.shutdown();
117    // restore cache from file
118    bucketCache =
119      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
120        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
121    assertTrue(bucketCache.waitForCacheInitialization(10000));
122    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
123    // persist cache to file
124    bucketCache.shutdown();
125
126    // 2.delete bucket cache file
127    final java.nio.file.Path cacheFile =
128      FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
129    assertTrue(Files.deleteIfExists(cacheFile));
130    // can't restore cache from file
131    final BucketCache recoveredBucketCache =
132      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
133        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
134    assertTrue(recoveredBucketCache.waitForCacheInitialization(10000));
135    Waiter.waitFor(HBaseConfiguration.create(), 1000,
136      () -> recoveredBucketCache.getBackingMapValidated().get());
137    assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
138    assertEquals(0, recoveredBucketCache.backingMap.size());
139    // Add blocks
140    for (CacheTestUtils.HFileBlockPair block : blocks) {
141      cacheAndWaitUntilFlushedToBucket(recoveredBucketCache, block.getBlockName(),
142        block.getBlock());
143    }
144    usedSize = recoveredBucketCache.getAllocator().getUsedSize();
145    assertNotEquals(0, usedSize);
146    // persist cache to file
147    recoveredBucketCache.shutdown();
148
149    // 3.delete backingMap persistence file
150    final java.nio.file.Path mapFile =
151      FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence");
152    assertTrue(Files.deleteIfExists(mapFile));
153    // can't restore cache from file
154    bucketCache =
155      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
156        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
157    assertTrue(bucketCache.waitForCacheInitialization(10000));
158    assertEquals(0, bucketCache.getAllocator().getUsedSize());
159    assertEquals(0, bucketCache.backingMap.size());
160
161    TEST_UTIL.cleanupTestDir();
162  }
163
164  @Test
165  public void testRetrieveFromFileAfterDelete() throws Exception {
166    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
167    Path testDir = TEST_UTIL.getDataTestDir();
168    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
169    Configuration conf = TEST_UTIL.getConfiguration();
170    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
171    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
172    BucketCache bucketCache =
173      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
174        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf);
175    assertTrue(bucketCache.waitForCacheInitialization(10000));
176
177    long usedSize = bucketCache.getAllocator().getUsedSize();
178    assertEquals(0, usedSize);
179    CacheTestUtils.HFileBlockPair[] blocks =
180      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
181    // Add blocks
182    for (CacheTestUtils.HFileBlockPair block : blocks) {
183      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
184    }
185    usedSize = bucketCache.getAllocator().getUsedSize();
186    assertNotEquals(0, usedSize);
187    // Shutdown BucketCache
188    bucketCache.shutdown();
189    // Delete the persistence file
190    File mapFile = new File(mapFileName);
191    assertTrue(mapFile.delete());
192    Thread.sleep(350);
193    // Create BucketCache
194    bucketCache =
195      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
196        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf);
197    assertTrue(bucketCache.waitForCacheInitialization(10000));
198    assertEquals(0, bucketCache.getAllocator().getUsedSize());
199    assertEquals(0, bucketCache.backingMap.size());
200  }
201
202  /**
203   * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
204   * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
205   * after modify cache file's data, and it can't restore cache from file, the cache file and
206   * persistence file would be deleted before BucketCache start normally.
207   * @throws Exception the exception
208   */
209  @Test
210  public void testModifiedBucketCacheFileData() throws Exception {
211    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
212    Path testDir = TEST_UTIL.getDataTestDir();
213    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
214
215    Configuration conf = HBaseConfiguration.create();
216    // Disables the persister thread by setting its interval to MAX_VALUE
217    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
218    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
219      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
220      testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
221    assertTrue(bucketCache.waitForCacheInitialization(10000));
222    long usedSize = bucketCache.getAllocator().getUsedSize();
223    assertEquals(0, usedSize);
224
225    CacheTestUtils.HFileBlockPair[] blocks =
226      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
227    // Add blocks
228    for (CacheTestUtils.HFileBlockPair block : blocks) {
229      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
230    }
231    usedSize = bucketCache.getAllocator().getUsedSize();
232    assertNotEquals(0, usedSize);
233    // persist cache to file
234    bucketCache.shutdown();
235
236    // modified bucket cache file
237    String file = testDir + "/bucket.cache";
238    try (BufferedWriter out =
239      new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) {
240      out.write("test bucket cache");
241    }
242    // can't restore cache from file
243    bucketCache =
244      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
245        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
246    assertTrue(bucketCache.waitForCacheInitialization(10000));
247    assertEquals(0, bucketCache.getAllocator().getUsedSize());
248    assertEquals(0, bucketCache.backingMap.size());
249
250    TEST_UTIL.cleanupTestDir();
251  }
252
253  /**
254   * Test whether BucketCache is started normally after modifying the cache file's last modified
255   * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
256   * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has
257   * modified persistence cache such that now we store extra 8 bytes at the end of each block in the
258   * cache, representing the nanosecond time the block has been cached. So in the event the cache
259   * file has failed checksum verification during loading time, we go through all the cached blocks
260   * in the cache map and validate the cached time long between what is in the map and the cache
261   * file. If that check fails, we pull the cache key entry out of the map. Since in this test we
262   * are only modifying the access time to induce a checksum error, the cache file content is still
263   * valid and the extra verification should validate that all cache keys in the map are still
264   * recoverable from the cache.
265   * @throws Exception the exception
266   */
267  @Test
268  public void testModifiedBucketCacheFileTime() throws Exception {
269    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
270    Path testDir = TEST_UTIL.getDataTestDir();
271    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
272
273    BucketCache bucketCache =
274      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
275        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
276    assertTrue(bucketCache.waitForCacheInitialization(10000));
277    long usedSize = bucketCache.getAllocator().getUsedSize();
278    assertEquals(0, usedSize);
279
280    Pair<String, Long> myPair = new Pair<>();
281
282    CacheTestUtils.HFileBlockPair[] blocks =
283      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
284    // Add blocks
285    for (CacheTestUtils.HFileBlockPair block : blocks) {
286      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
287    }
288    usedSize = bucketCache.getAllocator().getUsedSize();
289    assertNotEquals(0, usedSize);
290    long blockCount = bucketCache.backingMap.size();
291    assertNotEquals(0, blockCount);
292    // persist cache to file
293    bucketCache.shutdown();
294
295    // modified bucket cache file LastModifiedTime
296    final java.nio.file.Path file =
297      FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
298    Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000)));
299    // can't restore cache from file
300    bucketCache =
301      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
302        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
303    assertTrue(bucketCache.waitForCacheInitialization(10000));
304    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
305    assertEquals(blockCount, bucketCache.backingMap.size());
306
307    TEST_UTIL.cleanupTestDir();
308  }
309
310  /**
311   * When using persistent bucket cache, there may be crashes between persisting the backing map and
312   * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache
313   * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the
314   * keys that are still valid do succeed in retrieve related block data from the cache without any
315   * corruption.
316   * @throws Exception the exception
317   */
318  @Test
319  public void testBucketCacheRecovery() throws Exception {
320    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
321    Path testDir = TEST_UTIL.getDataTestDir();
322    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
323    Configuration conf = HBaseConfiguration.create();
324    // Disables the persister thread by setting its interval to MAX_VALUE
325    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
326    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
327    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
328      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
329      DEFAULT_ERROR_TOLERATION_DURATION, conf);
330    assertTrue(bucketCache.waitForCacheInitialization(10000));
331
332    CacheTestUtils.HFileBlockPair[] blocks =
333      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
334    // Add three blocks
335    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
336    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
337    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
338    // saves the current state
339    bucketCache.persistToFile();
340    // evicts first block
341    bucketCache.evictBlock(blocks[0].getBlockName());
342
343    // now adds a fourth block to bucket cache
344    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
345    // Creates new bucket cache instance without persisting to file after evicting first block
346    // and caching fourth block. So the bucket cache file has only the last three blocks,
347    // but backing map (containing cache keys) was persisted when first three blocks
348    // were in the cache. So the state on this recovery is:
349    // - Backing map: [block0, block1, block2]
350    // - Cache: [block1, block2, block3]
351    // Therefore, this bucket cache would be able to recover only block1 and block2.
352    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
353      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
354      DEFAULT_ERROR_TOLERATION_DURATION, conf);
355    assertTrue(newBucketCache.waitForCacheInitialization(10000));
356
357    assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
358    assertEquals(blocks[1].getBlock(),
359      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
360    assertEquals(blocks[2].getBlock(),
361      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
362    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
363    assertEquals(2, newBucketCache.backingMap.size());
364    TEST_UTIL.cleanupTestDir();
365  }
366
367  @Test
368  public void testSingleChunk() throws Exception {
369    testChunkedBackingMapRecovery(5, 5);
370  }
371
372  @Test
373  public void testCompletelyFilledChunks() throws Exception {
374    // Test where the all the chunks are complete with chunkSize entries
375    testChunkedBackingMapRecovery(5, 10);
376  }
377
378  @Test
379  public void testPartiallyFilledChunks() throws Exception {
380    // Test where the last chunk is not completely filled.
381    testChunkedBackingMapRecovery(5, 13);
382  }
383
384  private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception {
385    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
386    Path testDir = TEST_UTIL.getDataTestDir();
387    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
388    Configuration conf = HBaseConfiguration.create();
389    conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
390
391    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
392    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
393      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
394      DEFAULT_ERROR_TOLERATION_DURATION, conf);
395    assertTrue(bucketCache.waitForCacheInitialization(10000));
396
397    CacheTestUtils.HFileBlockPair[] blocks =
398      CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
399
400    for (int i = 0; i < numBlocks; i++) {
401      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), blocks[i].getBlock());
402    }
403
404    // saves the current state
405    bucketCache.persistToFile();
406
407    // Create a new bucket which reads from persistence file.
408    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
409      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
410      DEFAULT_ERROR_TOLERATION_DURATION, conf);
411    assertTrue(newBucketCache.waitForCacheInitialization(10000));
412    assertEquals(numBlocks, newBucketCache.backingMap.size());
413
414    for (int i = 0; i < numBlocks; i++) {
415      assertEquals(blocks[i].getBlock(),
416        newBucketCache.getBlock(blocks[i].getBlockName(), false, false, false));
417    }
418    TEST_UTIL.cleanupTestDir();
419  }
420
421  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
422    throws InterruptedException {
423    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
424      Thread.sleep(100);
425    }
426  }
427
428  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
429  // threads will flush it to the bucket and put reference entry in backingMap.
430  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
431    Cacheable block) throws InterruptedException {
432    cache.cacheBlock(cacheKey, block);
433    waitUntilFlushedToBucket(cache, cacheKey);
434  }
435}