001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.File; 030import java.io.IOException; 031import java.util.Map; 032import java.util.Random; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.function.BiConsumer; 035import java.util.function.BiFunction; 036import org.apache.commons.lang3.mutable.MutableLong; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.Waiter; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.RegionInfoBuilder; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.io.ByteBuffAllocator; 051import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 052import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; 053import org.apache.hadoop.hbase.regionserver.BloomType; 054import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 055import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 056import org.apache.hadoop.hbase.regionserver.HStoreFile; 057import org.apache.hadoop.hbase.regionserver.StoreContext; 058import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 059import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 061import org.apache.hadoop.hbase.testclassification.IOTests; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.junit.After; 065import org.junit.Before; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 075 076@Category({ IOTests.class, MediumTests.class }) 077public class TestPrefetchWithBucketCache { 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class); 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class); 084 085 @Rule 086 public TestName name = new TestName(); 087 088 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 089 090 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 091 private static final int DATA_BLOCK_SIZE = 2048; 092 private Configuration conf; 093 private CacheConfig cacheConf; 094 private FileSystem fs; 095 private BlockCache blockCache; 096 097 @Before 098 public void setUp() throws IOException { 099 conf = TEST_UTIL.getConfiguration(); 100 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 101 fs = HFileSystem.get(conf); 102 File testDir = new File(name.getMethodName()); 103 testDir.mkdir(); 104 conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); 105 } 106 107 @After 108 public void tearDown() { 109 File cacheFile = new File(name.getMethodName() + "/bucket.cache"); 110 File dir = new File(name.getMethodName()); 111 cacheFile.delete(); 112 dir.delete(); 113 } 114 115 @Test 116 public void testPrefetchDoesntOverwork() throws Exception { 117 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 118 blockCache = BlockCacheFactory.createBlockCache(conf); 119 cacheConf = new CacheConfig(conf, blockCache); 120 Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100); 121 // Prefetches the file blocks 122 LOG.debug("First read should prefetch the blocks."); 123 readStoreFile(storeFile); 124 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 125 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 126 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 127 Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap()); 128 // Reads file again and check we are not prefetching it again 129 LOG.debug("Second read, no prefetch should happen here."); 130 readStoreFile(storeFile); 131 // Makes sure the cache hasn't changed 132 snapshot.entrySet().forEach(e -> { 133 BucketEntry entry = bc.getBackingMap().get(e.getKey()); 134 assertNotNull(entry); 135 assertEquals(e.getValue().getCachedTime(), entry.getCachedTime()); 136 }); 137 // forcibly removes first block from the bc backing map, in order to cause it to be cached again 138 BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); 139 LOG.debug("removing block {}", key); 140 bc.getBackingMap().remove(key); 141 bc.getFullyCachedFiles().get().remove(storeFile.getName()); 142 assertTrue(snapshot.size() > bc.getBackingMap().size()); 143 LOG.debug("Third read should prefetch again, as we removed one block for the file."); 144 readStoreFile(storeFile); 145 Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size()); 146 assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); 147 } 148 149 @Test 150 public void testPrefetchRefsAfterSplit() throws Exception { 151 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 152 blockCache = BlockCacheFactory.createBlockCache(conf); 153 cacheConf = new CacheConfig(conf, blockCache); 154 155 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit"); 156 RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); 157 Path regionDir = new Path(tableDir, region.getEncodedName()); 158 Path cfDir = new Path(regionDir, "cf"); 159 HRegionFileSystem regionFS = 160 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 161 Path storeFile = writeStoreFile(100, cfDir); 162 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, 163 StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir) 164 .withCacheConfig(cacheConf).build()); 165 // Prefetches the file blocks 166 LOG.debug("First read should prefetch the blocks."); 167 readStoreFile(storeFile); 168 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 169 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 170 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 171 172 // split the file and return references to the original file 173 Random rand = ThreadLocalRandom.current(); 174 byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50); 175 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft); 176 Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false, 177 new ConstantSizeRegionSplitPolicy(), sft); 178 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft); 179 // starts reader for the ref. The ref should resolve to the original file blocks 180 // and not duplicate blocks in the cache. 181 refHsf.initReader(); 182 HFile.Reader reader = refHsf.getReader().getHFileReader(); 183 while (!reader.prefetchComplete()) { 184 // Sleep for a bit 185 Thread.sleep(1000); 186 } 187 // the ref file blocks keys should actually resolve to the referred file blocks, 188 // so we should not see additional blocks in the cache. 189 Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); 190 191 BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0); 192 Cacheable result = bc.getBlock(refCacheKey, true, false, true); 193 assertNotNull(result); 194 BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0); 195 assertEquals(result, bc.getBlock(fileCacheKey, true, false, true)); 196 assertNull(bc.getBackingMap().get(refCacheKey)); 197 assertNotNull(bc.getBlockForReference(refCacheKey)); 198 } 199 200 @Test 201 public void testPrefetchInterruptOnCapacity() throws Exception { 202 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 203 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 204 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 205 conf.setDouble("hbase.bucketcache.minfactor", 0.95); 206 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01); 207 blockCache = BlockCacheFactory.createBlockCache(conf); 208 cacheConf = new CacheConfig(conf, blockCache); 209 Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); 210 // Prefetches the file blocks 211 LOG.debug("First read should prefetch the blocks."); 212 createReaderAndWaitForPrefetchInterruption(storeFile); 213 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 214 long evictionsFirstPrefetch = bc.getStats().getEvictionCount(); 215 LOG.debug("evictions after first prefetch: {}", bc.getStats().getEvictionCount()); 216 HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile); 217 LOG.debug("evictions after second prefetch: {}", bc.getStats().getEvictionCount()); 218 assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) < 10); 219 HFileScanner scanner = reader.getScanner(conf, true, true); 220 scanner.seekTo(); 221 while (scanner.next()) { 222 // do a full scan to force some evictions 223 LOG.trace("Iterating the full scan to evict some blocks"); 224 } 225 scanner.close(); 226 LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount()); 227 // The scanner should had triggered at least 3x evictions from the prefetch, 228 // as we try cache each block without interruption. 229 assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch); 230 } 231 232 @Test 233 public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception { 234 conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); 235 conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); 236 conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); 237 conf.setDouble("hbase.bucketcache.minfactor", 0.95); 238 conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01); 239 blockCache = BlockCacheFactory.createBlockCache(conf); 240 ColumnFamilyDescriptor family = 241 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build(); 242 cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP); 243 Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000); 244 // Prefetches the file blocks 245 LOG.debug("First read should prefetch the blocks."); 246 createReaderAndWaitForPrefetchInterruption(storeFile); 247 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 248 assertTrue(bc.getStats().getEvictedCount() > 200); 249 } 250 251 @Test 252 public void testPrefetchMetricProgress() throws Exception { 253 conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); 254 blockCache = BlockCacheFactory.createBlockCache(conf); 255 cacheConf = new CacheConfig(conf, blockCache); 256 Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100); 257 // Prefetches the file blocks 258 LOG.debug("First read should prefetch the blocks."); 259 readStoreFile(storeFile); 260 String regionName = storeFile.getParent().getParent().getName(); 261 BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); 262 MutableLong regionCachedSize = new MutableLong(0); 263 // Our file should have 6 DATA blocks. We should wait for all of them to be cached 264 long waitedTime = Waiter.waitFor(conf, 300, () -> { 265 if (bc.getBackingMap().size() > 0) { 266 long currentSize = bc.getRegionCachedInfo().get().get(regionName); 267 assertTrue(regionCachedSize.getValue() <= currentSize); 268 LOG.debug("Logging progress of region caching: {}", currentSize); 269 regionCachedSize.setValue(currentSize); 270 } 271 return bc.getBackingMap().size() == 6; 272 }); 273 } 274 275 private void readStoreFile(Path storeFilePath) throws Exception { 276 readStoreFile(storeFilePath, (r, o) -> { 277 HFileBlock block = null; 278 try { 279 block = r.readBlock(o, -1, false, true, false, true, null, null); 280 } catch (IOException e) { 281 fail(e.getMessage()); 282 } 283 return block; 284 }, (key, block) -> { 285 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 286 if ( 287 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 288 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 289 ) { 290 assertTrue(isCached); 291 } 292 }); 293 } 294 295 private void readStoreFile(Path storeFilePath, 296 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 297 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 298 // Open the file 299 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 300 301 while (!reader.prefetchComplete()) { 302 // Sleep for a bit 303 Thread.sleep(1000); 304 } 305 long offset = 0; 306 long sizeForDataBlocks = 0; 307 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 308 HFileBlock block = readFunction.apply(reader, offset); 309 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 310 validationFunction.accept(blockCacheKey, block); 311 offset += block.getOnDiskSizeWithHeader(); 312 } 313 } 314 315 private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath) 316 throws Exception { 317 // Open the file 318 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 319 320 while (!reader.prefetchComplete()) { 321 // Sleep for a bit 322 Thread.sleep(1000); 323 } 324 assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles() 325 .get().size()); 326 327 return reader; 328 } 329 330 private Path writeStoreFile(String fname, int numKVs) throws IOException { 331 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 332 return writeStoreFile(fname, meta, numKVs); 333 } 334 335 private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException { 336 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 337 return writeStoreFile(meta, numKVs, regionCFDir); 338 } 339 340 private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException { 341 return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname)); 342 } 343 344 private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir) 345 throws IOException { 346 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) 347 .withOutputDir(regionCFDir).withFileContext(context).build(); 348 Random rand = ThreadLocalRandom.current(); 349 final int rowLen = 32; 350 for (int i = 0; i < numKVs; ++i) { 351 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 352 byte[] v = RandomKeyValueUtil.randomValue(rand); 353 int cfLen = rand.nextInt(k.length - rowLen + 1); 354 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 355 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 356 sfw.append(kv); 357 } 358 359 sfw.close(); 360 return sfw.getPath(); 361 } 362 363 public static KeyValue.Type generateKeyType(Random rand) { 364 if (rand.nextBoolean()) { 365 // Let's make half of KVs puts. 366 return KeyValue.Type.Put; 367 } else { 368 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 369 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 370 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 371 + "Probably the layout of KeyValue.Type has changed."); 372 } 373 return keyType; 374 } 375 } 376}