001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNull; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 031import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; 032import org.apache.hadoop.hbase.io.hfile.Cacheable; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037 038/** 039 * Basic test for check file's integrity before start BucketCache in fileIOEngine 040 */ 041@Category(SmallTests.class) 042public class TestRecoveryPersistentBucketCache { 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestRecoveryPersistentBucketCache.class); 046 047 final long capacitySize = 32 * 1024 * 1024; 048 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 049 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 050 051 @Test 052 public void testBucketCacheRecovery() throws Exception { 053 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 054 Path testDir = TEST_UTIL.getDataTestDir(); 055 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 056 Configuration conf = HBaseConfiguration.create(); 057 // Disables the persister thread by setting its interval to MAX_VALUE 058 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 059 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 060 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 061 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 062 DEFAULT_ERROR_TOLERATION_DURATION, conf); 063 064 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 065 066 CacheTestUtils.HFileBlockPair[] smallerBlocks = CacheTestUtils.generateHFileBlocks(4096, 1); 067 // Add four blocks 068 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 069 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 070 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 071 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 072 // saves the current state of the cache 073 bucketCache.persistToFile(); 074 // evicts the 4th block 075 bucketCache.evictBlock(blocks[3].getBlockName()); 076 // now adds a 5th block to bucket cache. This block is half the size of the previous 077 // blocks, and it will be added in the same offset of the previous evicted block. 078 // This overwrites part of the 4th block. Because we persisted only up to the 079 // 4th block addition, recovery would try to read the whole 4th block, but the cached time 080 // validation will fail, and we'll recover only the first three blocks 081 cacheAndWaitUntilFlushedToBucket(bucketCache, smallerBlocks[0].getBlockName(), 082 smallerBlocks[0].getBlock()); 083 084 // Creates new bucket cache instance without persisting to file after evicting 4th block 085 // and caching 5th block. Here the cache file has the first three blocks, followed by the 086 // 5th block and the second half of 4th block (we evicted 4th block, freeing up its 087 // offset in the cache, then added 5th block which is half the size of other blocks, so it's 088 // going to override the first half of the 4th block in the cache). That's fine because 089 // the in-memory backing map has the right blocks and related offsets. However, the 090 // persistent map file only has information about the first four blocks. We validate the 091 // cache time recorded in the back map against the block data in the cache. This is recorded 092 // in the cache as the first 8 bytes of a block, so the 4th block had its first 8 blocks 093 // now overridden by the 5th block, causing this check to fail and removal of 094 // the 4th block from the backing map. 095 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 096 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 097 DEFAULT_ERROR_TOLERATION_DURATION, conf); 098 Thread.sleep(100); 099 assertEquals(3, newBucketCache.backingMap.size()); 100 assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); 101 assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false, false, false)); 102 assertEquals(blocks[0].getBlock(), 103 newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); 104 assertEquals(blocks[1].getBlock(), 105 newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); 106 assertEquals(blocks[2].getBlock(), 107 newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); 108 TEST_UTIL.cleanupTestDir(); 109 } 110 111 @Test 112 public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { 113 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 114 Path testDir = TEST_UTIL.getDataTestDir(); 115 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 116 Configuration conf = HBaseConfiguration.create(); 117 // Disables the persister thread by setting its interval to MAX_VALUE 118 conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); 119 int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; 120 BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 121 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 122 DEFAULT_ERROR_TOLERATION_DURATION, conf); 123 124 CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); 125 126 // Add four blocks 127 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); 128 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); 129 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); 130 cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); 131 // saves the current state of the cache 132 bucketCache.persistToFile(); 133 134 BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 135 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 136 DEFAULT_ERROR_TOLERATION_DURATION, conf); 137 Thread.sleep(100); 138 assertEquals(4, newBucketCache.backingMap.size()); 139 newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName()); 140 assertEquals(3, newBucketCache.backingMap.size()); 141 TEST_UTIL.cleanupTestDir(); 142 } 143 144 private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) 145 throws InterruptedException { 146 while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { 147 Thread.sleep(100); 148 } 149 } 150 151 // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer 152 // threads will flush it to the bucket and put reference entry in backingMap. 153 private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, 154 Cacheable block) throws InterruptedException { 155 cache.cacheBlock(cacheKey, block); 156 waitUntilFlushedToBucket(cache, cacheKey); 157 } 158 159}