001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotEquals;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026
027import java.io.BufferedWriter;
028import java.io.File;
029import java.io.FileOutputStream;
030import java.io.OutputStreamWriter;
031import java.nio.file.FileSystems;
032import java.nio.file.Files;
033import java.nio.file.attribute.FileTime;
034import java.time.Instant;
035import java.util.Arrays;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
042import org.apache.hadoop.hbase.io.hfile.CacheConfig;
043import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
044import org.apache.hadoop.hbase.io.hfile.Cacheable;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.Pair;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053
054/**
055 * Basic test for check file's integrity before start BucketCache in fileIOEngine
056 */
057@RunWith(Parameterized.class)
058@Category(SmallTests.class)
059public class TestVerifyBucketCacheFile {
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestVerifyBucketCacheFile.class);
063
064  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
065  public static Iterable<Object[]> data() {
066    return Arrays.asList(new Object[][] { { 8192, null },
067      { 16 * 1024,
068        new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
069          28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
070          128 * 1024 + 1024 } } });
071  }
072
073  @Parameterized.Parameter(0)
074  public int constructedBlockSize;
075
076  @Parameterized.Parameter(1)
077  public int[] constructedBlockSizes;
078
079  final long capacitySize = 32 * 1024 * 1024;
080  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
081  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
082
083  /**
084   * Test cache file or persistence file does not exist whether BucketCache starts normally (1)
085   * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
086   * Restart BucketCache and it can restore cache from file. (2) Delete bucket cache file after
087   * shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the cache file
088   * and persistence file would be deleted before BucketCache start normally. (3) Delete persistence
089   * file after shutdown BucketCache. Restart BucketCache and it can't restore cache from file, the
090   * cache file and persistence file would be deleted before BucketCache start normally.
091   * @throws Exception the exception
092   */
093  @Test
094  public void testRetrieveFromFile() throws Exception {
095    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
096    Path testDir = TEST_UTIL.getDataTestDir();
097    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
098
099    BucketCache bucketCache =
100      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
101        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
102    long usedSize = bucketCache.getAllocator().getUsedSize();
103    assertEquals(0, usedSize);
104    CacheTestUtils.HFileBlockPair[] blocks =
105      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
106    // Add blocks
107    for (CacheTestUtils.HFileBlockPair block : blocks) {
108      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
109    }
110    usedSize = bucketCache.getAllocator().getUsedSize();
111    assertNotEquals(0, usedSize);
112    // 1.persist cache to file
113    bucketCache.shutdown();
114    // restore cache from file
115    bucketCache =
116      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
117        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
118    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
119    // persist cache to file
120    bucketCache.shutdown();
121
122    // 2.delete bucket cache file
123    final java.nio.file.Path cacheFile =
124      FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
125    assertTrue(Files.deleteIfExists(cacheFile));
126    // can't restore cache from file
127    bucketCache =
128      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
129        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
130    Thread.sleep(100);
131    assertEquals(0, bucketCache.getAllocator().getUsedSize());
132    assertEquals(0, bucketCache.backingMap.size());
133    // Add blocks
134    for (CacheTestUtils.HFileBlockPair block : blocks) {
135      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
136    }
137    usedSize = bucketCache.getAllocator().getUsedSize();
138    assertNotEquals(0, usedSize);
139    // persist cache to file
140    bucketCache.shutdown();
141
142    // 3.delete backingMap persistence file
143    final java.nio.file.Path mapFile =
144      FileSystems.getDefault().getPath(testDir.toString(), "bucket.persistence");
145    assertTrue(Files.deleteIfExists(mapFile));
146    // can't restore cache from file
147    bucketCache =
148      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
149        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
150    Thread.sleep(100);
151    assertEquals(0, bucketCache.getAllocator().getUsedSize());
152    assertEquals(0, bucketCache.backingMap.size());
153
154    TEST_UTIL.cleanupTestDir();
155  }
156
157  @Test
158  public void testRetrieveFromFileAfterDelete() throws Exception {
159
160    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
161    Path testDir = TEST_UTIL.getDataTestDir();
162    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
163    Configuration conf = TEST_UTIL.getConfiguration();
164    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
165    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
166    BucketCache bucketCache =
167      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
168        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf);
169
170    long usedSize = bucketCache.getAllocator().getUsedSize();
171    assertEquals(0, usedSize);
172    CacheTestUtils.HFileBlockPair[] blocks =
173      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
174    // Add blocks
175    for (CacheTestUtils.HFileBlockPair block : blocks) {
176      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
177    }
178    usedSize = bucketCache.getAllocator().getUsedSize();
179    assertNotEquals(0, usedSize);
180    // Shutdown BucketCache
181    bucketCache.shutdown();
182    // Delete the persistence file
183    File mapFile = new File(mapFileName);
184    assertTrue(mapFile.delete());
185    Thread.sleep(350);
186    // Create BucketCache
187    bucketCache =
188      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
189        constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf);
190    assertEquals(0, bucketCache.getAllocator().getUsedSize());
191    assertEquals(0, bucketCache.backingMap.size());
192  }
193
194  /**
195   * Test whether BucketCache is started normally after modifying the cache file. Start BucketCache
196   * and add some blocks, then shutdown BucketCache and persist cache to file. Restart BucketCache
197   * after modify cache file's data, and it can't restore cache from file, the cache file and
198   * persistence file would be deleted before BucketCache start normally.
199   * @throws Exception the exception
200   */
201  @Test
202  public void testModifiedBucketCacheFileData() throws Exception {
203    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
204    Path testDir = TEST_UTIL.getDataTestDir();
205    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
206
207    Configuration conf = HBaseConfiguration.create();
208    // Disables the persister thread by setting its interval to MAX_VALUE
209    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
210    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
211      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
212      testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf);
213    long usedSize = bucketCache.getAllocator().getUsedSize();
214    assertEquals(0, usedSize);
215
216    CacheTestUtils.HFileBlockPair[] blocks =
217      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
218    // Add blocks
219    for (CacheTestUtils.HFileBlockPair block : blocks) {
220      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
221    }
222    usedSize = bucketCache.getAllocator().getUsedSize();
223    assertNotEquals(0, usedSize);
224    // persist cache to file
225    bucketCache.shutdown();
226
227    // modified bucket cache file
228    String file = testDir + "/bucket.cache";
229    try (BufferedWriter out =
230      new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, false)))) {
231      out.write("test bucket cache");
232    }
233    // can't restore cache from file
234    bucketCache =
235      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
236        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
237    Thread.sleep(100);
238    assertEquals(0, bucketCache.getAllocator().getUsedSize());
239    assertEquals(0, bucketCache.backingMap.size());
240
241    TEST_UTIL.cleanupTestDir();
242  }
243
244  /**
245   * Test whether BucketCache is started normally after modifying the cache file's last modified
246   * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
247   * to file. Then Restart BucketCache after modify cache file's last modified time. HBASE-XXXX has
248   * modified persistence cache such that now we store extra 8 bytes at the end of each block in the
249   * cache, representing the nanosecond time the block has been cached. So in the event the cache
250   * file has failed checksum verification during loading time, we go through all the cached blocks
251   * in the cache map and validate the cached time long between what is in the map and the cache
252   * file. If that check fails, we pull the cache key entry out of the map. Since in this test we
253   * are only modifying the access time to induce a checksum error, the cache file content is still
254   * valid and the extra verification should validate that all cache keys in the map are still
255   * recoverable from the cache.
256   * @throws Exception the exception
257   */
258  @Test
259  public void testModifiedBucketCacheFileTime() throws Exception {
260    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
261    Path testDir = TEST_UTIL.getDataTestDir();
262    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
263
264    BucketCache bucketCache =
265      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
266        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
267    long usedSize = bucketCache.getAllocator().getUsedSize();
268    assertEquals(0, usedSize);
269
270    Pair<String, Long> myPair = new Pair<>();
271
272    CacheTestUtils.HFileBlockPair[] blocks =
273      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
274    // Add blocks
275    for (CacheTestUtils.HFileBlockPair block : blocks) {
276      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
277    }
278    usedSize = bucketCache.getAllocator().getUsedSize();
279    assertNotEquals(0, usedSize);
280    long blockCount = bucketCache.backingMap.size();
281    assertNotEquals(0, blockCount);
282    // persist cache to file
283    bucketCache.shutdown();
284
285    // modified bucket cache file LastModifiedTime
286    final java.nio.file.Path file =
287      FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
288    Files.setLastModifiedTime(file, FileTime.from(Instant.now().plusMillis(1_000)));
289    // can't restore cache from file
290    bucketCache =
291      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
292        constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
293    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
294    assertEquals(blockCount, bucketCache.backingMap.size());
295
296    TEST_UTIL.cleanupTestDir();
297  }
298
299  /**
300   * When using persistent bucket cache, there may be crashes between persisting the backing map and
301   * syncing new blocks to the cache file itself, leading to an inconsistent state between the cache
302   * keys and the cached data. This is to make sure the cache keys are updated accordingly, and the
303   * keys that are still valid do succeed in retrieve related block data from the cache without any
304   * corruption.
305   * @throws Exception the exception
306   */
307  @Test
308  public void testBucketCacheRecovery() throws Exception {
309    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
310    Path testDir = TEST_UTIL.getDataTestDir();
311    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
312    Configuration conf = HBaseConfiguration.create();
313    // Disables the persister thread by setting its interval to MAX_VALUE
314    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
315    String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
316    BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
317      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
318      DEFAULT_ERROR_TOLERATION_DURATION, conf);
319
320    CacheTestUtils.HFileBlockPair[] blocks =
321      CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
322    // Add three blocks
323    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
324    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
325    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
326    // saves the current state
327    bucketCache.persistToFile();
328    // evicts first block
329    bucketCache.evictBlock(blocks[0].getBlockName());
330
331    // now adds a fourth block to bucket cache
332    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
333    // Creates new bucket cache instance without persisting to file after evicting first block
334    // and caching fourth block. So the bucket cache file has only the last three blocks,
335    // but backing map (containing cache keys) was persisted when first three blocks
336    // were in the cache. So the state on this recovery is:
337    // - Backing map: [block0, block1, block2]
338    // - Cache: [block1, block2, block3]
339    // Therefore, this bucket cache would be able to recover only block1 and block2.
340    BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
341      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName,
342      DEFAULT_ERROR_TOLERATION_DURATION, conf);
343
344    assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
345    assertEquals(blocks[1].getBlock(),
346      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
347    assertEquals(blocks[2].getBlock(),
348      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
349    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
350    assertEquals(2, newBucketCache.backingMap.size());
351    TEST_UTIL.cleanupTestDir();
352  }
353
354  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
355    throws InterruptedException {
356    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
357      Thread.sleep(100);
358    }
359  }
360
361  // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
362  // threads will flush it to the bucket and put reference entry in backingMap.
363  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
364    Cacheable block) throws InterruptedException {
365    cache.cacheBlock(cacheKey, block);
366    waitUntilFlushedToBucket(cache, cacheKey);
367  }
368}