001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Random; 028import java.util.concurrent.ThreadLocalRandom; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.fs.HFileSystem; 036import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 037import org.apache.hadoop.hbase.io.hfile.BlockType; 038import org.apache.hadoop.hbase.io.hfile.CacheConfig; 039import org.apache.hadoop.hbase.io.hfile.HFile; 040import org.apache.hadoop.hbase.io.hfile.HFileBlock; 041import org.apache.hadoop.hbase.io.hfile.HFileContext; 042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 043import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; 044import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; 045import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.rules.TestName; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055@Category({ IOTests.class, MediumTests.class }) 056public class TestBucketCachePersister { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestBucketCachePersister.class); 061 062 public TestName name = new TestName(); 063 064 public int constructedBlockSize = 16 * 1024; 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestBucketCachePersister.class); 067 068 public int[] constructedBlockSizes = 069 new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 070 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024 }; 071 072 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 073 074 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 075 private static final int DATA_BLOCK_SIZE = 2048; 076 private static final int NUM_KV = 1000; 077 078 final long capacitySize = 32 * 1024 * 1024; 079 final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; 080 final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; 081 Path testDir; 082 083 public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) throws IOException { 084 Configuration conf; 085 conf = TEST_UTIL.getConfiguration(); 086 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 087 testDir = TEST_UTIL.getDataTestDir(); 088 TEST_UTIL.getTestFileSystem().mkdirs(testDir); 089 conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, bucketCachePersistInterval); 090 return conf; 091 } 092 093 public BucketCache setupBucketCache(Configuration conf, String persistentCacheFile) 094 throws IOException { 095 BucketCache bucketCache = new BucketCache("file:" + testDir + "/" + persistentCacheFile, 096 capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 097 testDir + "/bucket.persistence", 60 * 1000, conf); 098 return bucketCache; 099 } 100 101 public void cleanupBucketCache(BucketCache bucketCache) throws IOException { 102 bucketCache.shutdown(); 103 TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir)); 104 assertFalse(TEST_UTIL.getTestFileSystem().exists(testDir)); 105 } 106 107 @Test 108 public void testPrefetchPersistenceCrash() throws Exception { 109 long bucketCachePersistInterval = 3000; 110 Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval); 111 BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrash"); 112 CacheConfig cacheConf = new CacheConfig(conf, bucketCache); 113 FileSystem fs = HFileSystem.get(conf); 114 // Load Cache 115 Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs); 116 Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs); 117 readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); 118 readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache); 119 Thread.sleep(bucketCachePersistInterval); 120 assertTrue(new File(testDir + "/bucket.persistence").exists()); 121 assertTrue(new File(testDir + "/bucket.persistence").delete()); 122 cleanupBucketCache(bucketCache); 123 } 124 125 @Test 126 public void testPrefetchPersistenceCrashNegative() throws Exception { 127 long bucketCachePersistInterval = Long.MAX_VALUE; 128 Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval); 129 BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrashNegative"); 130 CacheConfig cacheConf = new CacheConfig(conf, bucketCache); 131 FileSystem fs = HFileSystem.get(conf); 132 // Load Cache 133 Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs); 134 readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); 135 assertFalse(new File(testDir + "/bucket.persistence").exists()); 136 cleanupBucketCache(bucketCache); 137 } 138 139 @Test 140 public void testPrefetchListUponBlockEviction() throws Exception { 141 Configuration conf = setupBucketCacheConfig(200); 142 BucketCache bucketCache = setupBucketCache(conf, "testPrefetchListUponBlockEviction"); 143 CacheConfig cacheConf = new CacheConfig(conf, bucketCache); 144 FileSystem fs = HFileSystem.get(conf); 145 // Load Blocks in cache 146 Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs); 147 readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); 148 int retries = 0; 149 while (!bucketCache.fullyCachedFiles.containsKey(storeFile.getName()) && retries < 5) { 150 Thread.sleep(500); 151 retries++; 152 } 153 assertTrue(retries < 5); 154 BlockCacheKey bucketCacheKey = bucketCache.backingMap.entrySet().iterator().next().getKey(); 155 // Evict Blocks from cache 156 bucketCache.evictBlock(bucketCacheKey); 157 assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); 158 cleanupBucketCache(bucketCache); 159 } 160 161 @Test 162 public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception { 163 Configuration conf = setupBucketCacheConfig(200); 164 BucketCache bucketCache = 165 setupBucketCache(conf, "testPrefetchBlockEvictionWhilePrefetchRunning"); 166 CacheConfig cacheConf = new CacheConfig(conf, bucketCache); 167 FileSystem fs = HFileSystem.get(conf); 168 // Load Blocks in cache 169 Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs); 170 HFile.createReader(fs, storeFile, cacheConf, true, conf); 171 boolean evicted = false; 172 while (!PrefetchExecutor.isCompleted(storeFile)) { 173 LOG.debug("Entered loop as prefetch for {} is still running.", storeFile); 174 if (bucketCache.backingMap.size() > 0 && !evicted) { 175 Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it = 176 bucketCache.backingMap.entrySet().iterator(); 177 // Evict a data block from cache 178 Map.Entry<BlockCacheKey, BucketEntry> entry = it.next(); 179 while (it.hasNext() && !evicted) { 180 if (entry.getKey().getBlockType().equals(BlockType.DATA)) { 181 evicted = bucketCache.evictBlock(it.next().getKey()); 182 LOG.debug("Attempted eviction for {}. Succeeded? {}", storeFile, evicted); 183 } 184 } 185 } 186 Thread.sleep(10); 187 } 188 assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); 189 cleanupBucketCache(bucketCache); 190 } 191 192 public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf, 193 Configuration conf, BucketCache bucketCache) throws Exception { 194 // Open the file 195 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 196 197 while (!reader.prefetchComplete()) { 198 // Sleep for a bit 199 Thread.sleep(1000); 200 } 201 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); 202 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 203 BucketEntry be = bucketCache.backingMap.get(blockCacheKey); 204 boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null; 205 206 if ( 207 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 208 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 209 ) { 210 assertTrue(isCached); 211 } 212 } 213 214 public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs) 215 throws IOException { 216 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 217 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 218 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 219 .withOutputDir(storeFileParentDir).withFileContext(meta).build(); 220 Random rand = ThreadLocalRandom.current(); 221 final int rowLen = 32; 222 for (int i = 0; i < NUM_KV; ++i) { 223 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 224 byte[] v = RandomKeyValueUtil.randomValue(rand); 225 int cfLen = rand.nextInt(k.length - rowLen + 1); 226 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 227 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 228 sfw.append(kv); 229 } 230 231 sfw.close(); 232 return sfw.getPath(); 233 } 234 235 public static KeyValue.Type generateKeyType(Random rand) { 236 if (rand.nextBoolean()) { 237 // Let's make half of KVs puts. 238 return KeyValue.Type.Put; 239 } else { 240 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 241 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 242 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 243 + "Probably the layout of KeyValue.Type has changed."); 244 } 245 return keyType; 246 } 247 } 248 249}