001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.File;
029import java.io.IOException;
030import java.util.Map;
031import java.util.Random;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.function.BiConsumer;
034import java.util.function.BiFunction;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.fs.HFileSystem;
045import org.apache.hadoop.hbase.io.ByteBuffAllocator;
046import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
047import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
048import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
049import org.apache.hadoop.hbase.testclassification.IOTests;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.junit.After;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
063
064@Category({ IOTests.class, MediumTests.class })
065public class TestPrefetchWithBucketCache {
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class);
072
073  @Rule
074  public TestName name = new TestName();
075
076  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
077
078  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
079  private static final int DATA_BLOCK_SIZE = 2048;
080  private Configuration conf;
081  private CacheConfig cacheConf;
082  private FileSystem fs;
083  private BlockCache blockCache;
084
085  @Before
086  public void setUp() throws IOException {
087    conf = TEST_UTIL.getConfiguration();
088    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
089    fs = HFileSystem.get(conf);
090    File testDir = new File(name.getMethodName());
091    testDir.mkdir();
092    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache");
093  }
094
095  @After
096  public void tearDown() {
097    File cacheFile = new File(name.getMethodName() + "/bucket.cache");
098    File dir = new File(name.getMethodName());
099    cacheFile.delete();
100    dir.delete();
101  }
102
103  @Test
104  public void testPrefetchDoesntOverwork() throws Exception {
105    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
106    blockCache = BlockCacheFactory.createBlockCache(conf);
107    cacheConf = new CacheConfig(conf, blockCache);
108    Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100);
109    // Prefetches the file blocks
110    LOG.debug("First read should prefetch the blocks.");
111    readStoreFile(storeFile);
112    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
113    // Our file should have 6 DATA blocks. We should wait for all of them to be cached
114    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
115    Map<BlockCacheKey, BucketEntry> snapshot = ImmutableMap.copyOf(bc.getBackingMap());
116    // Reads file again and check we are not prefetching it again
117    LOG.debug("Second read, no prefetch should happen here.");
118    readStoreFile(storeFile);
119    // Makes sure the cache hasn't changed
120    snapshot.entrySet().forEach(e -> {
121      BucketEntry entry = bc.getBackingMap().get(e.getKey());
122      assertNotNull(entry);
123      assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
124    });
125    // forcibly removes first block from the bc backing map, in order to cause it to be cached again
126    BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
127    LOG.debug("removing block {}", key);
128    bc.getBackingMap().remove(key);
129    bc.getFullyCachedFiles().get().remove(storeFile.getName());
130    assertTrue(snapshot.size() > bc.getBackingMap().size());
131    LOG.debug("Third read should prefetch again, as we removed one block for the file.");
132    readStoreFile(storeFile);
133    Waiter.waitFor(conf, 300, () -> snapshot.size() == bc.getBackingMap().size());
134    assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime());
135  }
136
137  @Test
138  public void testPrefetchInterruptOnCapacity() throws Exception {
139    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
140    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
141    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
142    conf.setDouble("hbase.bucketcache.minfactor", 0.95);
143    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
144    blockCache = BlockCacheFactory.createBlockCache(conf);
145    cacheConf = new CacheConfig(conf, blockCache);
146    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
147    // Prefetches the file blocks
148    LOG.debug("First read should prefetch the blocks.");
149    createReaderAndWaitForPrefetchInterruption(storeFile);
150    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
151    long evictionsFirstPrefetch = bc.getStats().getEvictionCount();
152    LOG.debug("evictions after first prefetch: {}", bc.getStats().getEvictionCount());
153    HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile);
154    LOG.debug("evictions after second prefetch: {}", bc.getStats().getEvictionCount());
155    assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) < 10);
156    HFileScanner scanner = reader.getScanner(conf, true, true);
157    scanner.seekTo();
158    while (scanner.next()) {
159      // do a full scan to force some evictions
160      LOG.trace("Iterating the full scan to evict some blocks");
161    }
162    scanner.close();
163    LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount());
164    // The scanner should had triggered at least 3x evictions from the prefetch,
165    // as we try cache each block without interruption.
166    assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch);
167  }
168
169  @Test
170  public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception {
171    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
172    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
173    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
174    conf.setDouble("hbase.bucketcache.minfactor", 0.95);
175    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
176    blockCache = BlockCacheFactory.createBlockCache(conf);
177    ColumnFamilyDescriptor family =
178      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
179    cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP);
180    Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000);
181    // Prefetches the file blocks
182    LOG.debug("First read should prefetch the blocks.");
183    createReaderAndWaitForPrefetchInterruption(storeFile);
184    BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
185    assertTrue(bc.getStats().getEvictedCount() > 200);
186  }
187
188  private void readStoreFile(Path storeFilePath) throws Exception {
189    readStoreFile(storeFilePath, (r, o) -> {
190      HFileBlock block = null;
191      try {
192        block = r.readBlock(o, -1, false, true, false, true, null, null);
193      } catch (IOException e) {
194        fail(e.getMessage());
195      }
196      return block;
197    }, (key, block) -> {
198      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
199      if (
200        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
201          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
202      ) {
203        assertTrue(isCached);
204      }
205    });
206  }
207
208  private void readStoreFile(Path storeFilePath,
209    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
210    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
211    // Open the file
212    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
213
214    while (!reader.prefetchComplete()) {
215      // Sleep for a bit
216      Thread.sleep(1000);
217    }
218    long offset = 0;
219    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
220      HFileBlock block = readFunction.apply(reader, offset);
221      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
222      validationFunction.accept(blockCacheKey, block);
223      offset += block.getOnDiskSizeWithHeader();
224    }
225  }
226
227  private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath)
228    throws Exception {
229    // Open the file
230    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
231
232    while (!reader.prefetchComplete()) {
233      // Sleep for a bit
234      Thread.sleep(1000);
235    }
236    assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles()
237      .get().size());
238
239    return reader;
240  }
241
242  private Path writeStoreFile(String fname, int numKVs) throws IOException {
243    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
244    return writeStoreFile(fname, meta, numKVs);
245  }
246
247  private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException {
248    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
249    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
250      .withOutputDir(storeFileParentDir).withFileContext(context).build();
251    Random rand = ThreadLocalRandom.current();
252    final int rowLen = 32;
253    for (int i = 0; i < numKVs; ++i) {
254      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
255      byte[] v = RandomKeyValueUtil.randomValue(rand);
256      int cfLen = rand.nextInt(k.length - rowLen + 1);
257      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
258        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
259      sfw.append(kv);
260    }
261
262    sfw.close();
263    return sfw.getPath();
264  }
265
266  public static KeyValue.Type generateKeyType(Random rand) {
267    if (rand.nextBoolean()) {
268      // Let's make half of KVs puts.
269      return KeyValue.Type.Put;
270    } else {
271      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
272      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
273        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
274          + "Probably the layout of KeyValue.Type has changed.");
275      }
276      return keyType;
277    }
278  }
279
280}