001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.File;
030import java.io.IOException;
031import java.util.Map;
032import java.util.Random;
033import java.util.concurrent.ThreadLocalRandom;
034import java.util.function.BiConsumer;
035import java.util.function.BiFunction;
036import org.apache.commons.lang3.mutable.MutableLong;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.Waiter;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.fs.HFileSystem;
050import org.apache.hadoop.hbase.io.ByteBuffAllocator;
051import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
053import org.apache.hadoop.hbase.regionserver.BloomType;
054import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
055import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
056import org.apache.hadoop.hbase.regionserver.HStoreFile;
057import org.apache.hadoop.hbase.regionserver.StoreContext;
058import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
059import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
060import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
061import org.apache.hadoop.hbase.testclassification.IOTests;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.junit.After;
065import org.junit.Before;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
075
076@Category({ IOTests.class, MediumTests.class })
077public class TestPrefetchWithBucketCache {
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class);
084
085  @Rule
086  public TestName name = new TestName();
087
088  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
089
090  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
091  private static final int DATA_BLOCK_SIZE = 2048;
092  private Configuration conf;
093  private CacheConfig cacheConf;
094  private FileSystem fs;
095  private BlockCache blockCache;
096
097  @Before
098  public void setUp() throws IOException {
099    conf = TEST_UTIL.getConfiguration();
100    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
101    fs = HFileSystem.get(conf);
102    File testDir = new File(name.getMethodName());
103    testDir.mkdir();
104    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache");
105  }
106
107  @After
108  public void tearDown() {
109    File cacheFile = new File(name.getMethodName() + "/bucket.cache");
110    File dir = new File(name.getMethodName());
111    cacheFile.delete();
112    dir.delete();
113  }
114
115  @Test
116  public void testPrefetchDoesntOverwork() throws Exception {
117    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
118    blockCache = BlockCacheFactory.createBlockCache(conf);
119    cacheConf = new CacheConfig(conf, blockCache);
120    Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100);
121    // Prefetches the file blocks
122    LOG.debug("First read should prefetch the blocks.");
123    readStoreFile(storeFile);
124    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
125    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
126    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
127    Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap());
128    // Reads file again and check we are not prefetching it again
129    LOG.debug("Second read, no prefetch should happen here.");
130    readStoreFile(storeFile);
131    // Makes sure the cache hasn't changed
132    snapshot.entrySet().forEach(e -> {
133      BucketEntry entry = bc.getBackingMap().get(e.getKey());
134      assertNotNull(entry);
135      assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
136    });
137    // forcibly removes first block from the bc backing map, in order to cause it to be cached again
138    BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
139    LOG.debug("removing block {}", key);
140    bc.getBackingMap().remove(key);
141    bc.getFullyCachedFiles().get().remove(storeFile.getName());
142    assertTrue(snapshot.size() > bc.getBackingMap().size());
143    LOG.debug("Third read should prefetch again, as we removed one block for the file.");
144    readStoreFile(storeFile);
145    Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size());
146    assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime());
147  }
148
149  @Test
150  public void testPrefetchRefsAfterSplit() throws Exception {
151    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
152    blockCache = BlockCacheFactory.createBlockCache(conf);
153    cacheConf = new CacheConfig(conf, blockCache);
154
155    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit");
156    RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
157    Path regionDir = new Path(tableDir, region.getEncodedName());
158    Path cfDir = new Path(regionDir, "cf");
159    HRegionFileSystem regionFS =
160      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
161    Path storeFile = writeStoreFile(100, cfDir);
162    StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,
163      StoreContext.getBuilder().withRegionFileSystem(regionFS).withFamilyStoreDirectoryPath(cfDir)
164        .withCacheConfig(cacheConf).build());
165    // Prefetches the file blocks
166    LOG.debug("First read should prefetch the blocks.");
167    readStoreFile(storeFile);
168    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
169    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
170    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
171
172    // split the file and return references to the original file
173    Random rand = ThreadLocalRandom.current();
174    byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50);
175    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft);
176    Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false,
177      new ConstantSizeRegionSplitPolicy(), sft);
178    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft);
179    // starts reader for the ref. The ref should resolve to the original file blocks
180    // and not duplicate blocks in the cache.
181    refHsf.initReader();
182    HFile.Reader reader = refHsf.getReader().getHFileReader();
183    while (!reader.prefetchComplete()) {
184      // Sleep for a bit
185      Thread.sleep(1000);
186    }
187    // the ref file blocks keys should actually resolve to the referred file blocks,
188    // so we should not see additional blocks in the cache.
189    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
190
191    BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0);
192    Cacheable result = bc.getBlock(refCacheKey, true, false, true);
193    assertNotNull(result);
194    BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0);
195    assertEquals(result, bc.getBlock(fileCacheKey, true, false, true));
196    assertNull(bc.getBackingMap().get(refCacheKey));
197    assertNotNull(bc.getBlockForReference(refCacheKey));
198  }
199
200  @Test
201  public void testPrefetchInterruptOnCapacity() throws Exception {
202    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
203    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
204    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
205    conf.setDouble("hbase.bucketcache.minfactor", 0.95);
206    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
207    blockCache = BlockCacheFactory.createBlockCache(conf);
208    cacheConf = new CacheConfig(conf, blockCache);
209    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
210    // Prefetches the file blocks
211    LOG.debug("First read should prefetch the blocks.");
212    createReaderAndWaitForPrefetchInterruption(storeFile);
213    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
214    long evictionsFirstPrefetch = bc.getStats().getEvictionCount();
215    LOG.debug("evictions after first prefetch: {}", bc.getStats().getEvictionCount());
216    HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile);
217    LOG.debug("evictions after second prefetch: {}", bc.getStats().getEvictionCount());
218    assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) < 10);
219    HFileScanner scanner = reader.getScanner(conf, true, true);
220    scanner.seekTo();
221    while (scanner.next()) {
222      // do a full scan to force some evictions
223      LOG.trace("Iterating the full scan to evict some blocks");
224    }
225    scanner.close();
226    LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount());
227    // The scanner should had triggered at least 3x evictions from the prefetch,
228    // as we try cache each block without interruption.
229    assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch);
230  }
231
232  @Test
233  public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
234    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
235    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
236    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
237    conf.setDouble("hbase.bucketcache.minfactor", 0.95);
238    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
239    blockCache = BlockCacheFactory.createBlockCache(conf);
240    ColumnFamilyDescriptor family =
241      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
242    cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP);
243    Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000);
244    // Prefetches the file blocks
245    LOG.debug("First read should prefetch the blocks.");
246    createReaderAndWaitForPrefetchInterruption(storeFile);
247    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
248    assertTrue(bc.getStats().getEvictedCount() > 200);
249  }
250
251  @Test
252  public void testPrefetchMetricProgress() throws Exception {
253    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
254    blockCache = BlockCacheFactory.createBlockCache(conf);
255    cacheConf = new CacheConfig(conf, blockCache);
256    Path storeFile = writeStoreFile("testPrefetchMetricsProgress", 100);
257    // Prefetches the file blocks
258    LOG.debug("First read should prefetch the blocks.");
259    readStoreFile(storeFile);
260    String regionName = storeFile.getParent().getParent().getName();
261    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
262    MutableLong regionCachedSize = new MutableLong(0);
263    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
264    long waitedTime = Waiter.waitFor(conf, 300, () -> {
265      if (bc.getBackingMap().size() > 0) {
266        long currentSize = bc.getRegionCachedInfo().get().get(regionName);
267        assertTrue(regionCachedSize.getValue() <= currentSize);
268        LOG.debug("Logging progress of region caching: {}", currentSize);
269        regionCachedSize.setValue(currentSize);
270      }
271      return bc.getBackingMap().size() == 6;
272    });
273  }
274
275  private void readStoreFile(Path storeFilePath) throws Exception {
276    readStoreFile(storeFilePath, (r, o) -> {
277      HFileBlock block = null;
278      try {
279        block = r.readBlock(o, -1, false, true, false, true, null, null);
280      } catch (IOException e) {
281        fail(e.getMessage());
282      }
283      return block;
284    }, (key, block) -> {
285      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
286      if (
287        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
288          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
289      ) {
290        assertTrue(isCached);
291      }
292    });
293  }
294
295  private void readStoreFile(Path storeFilePath,
296    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
297    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
298    // Open the file
299    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
300
301    while (!reader.prefetchComplete()) {
302      // Sleep for a bit
303      Thread.sleep(1000);
304    }
305    long offset = 0;
306    long sizeForDataBlocks = 0;
307    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
308      HFileBlock block = readFunction.apply(reader, offset);
309      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
310      validationFunction.accept(blockCacheKey, block);
311      offset += block.getOnDiskSizeWithHeader();
312    }
313  }
314
315  private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath)
316    throws Exception {
317    // Open the file
318    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
319
320    while (!reader.prefetchComplete()) {
321      // Sleep for a bit
322      Thread.sleep(1000);
323    }
324    assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles()
325      .get().size());
326
327    return reader;
328  }
329
330  private Path writeStoreFile(String fname, int numKVs) throws IOException {
331    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
332    return writeStoreFile(fname, meta, numKVs);
333  }
334
335  private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException {
336    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
337    return writeStoreFile(meta, numKVs, regionCFDir);
338  }
339
340  private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException {
341    return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname));
342  }
343
344  private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir)
345    throws IOException {
346    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
347      .withOutputDir(regionCFDir).withFileContext(context).build();
348    Random rand = ThreadLocalRandom.current();
349    final int rowLen = 32;
350    for (int i = 0; i < numKVs; ++i) {
351      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
352      byte[] v = RandomKeyValueUtil.randomValue(rand);
353      int cfLen = rand.nextInt(k.length - rowLen + 1);
354      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
355        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
356      sfw.append(kv);
357    }
358
359    sfw.close();
360    return sfw.getPath();
361  }
362
363  public static KeyValue.Type generateKeyType(Random rand) {
364    if (rand.nextBoolean()) {
365      // Let's make half of KVs puts.
366      return KeyValue.Type.Put;
367    } else {
368      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
369      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
370        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
371          + "Probably the layout of KeyValue.Type has changed.");
372      }
373      return keyType;
374    }
375  }
376}