001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.File;
025import java.io.IOException;
026import java.util.Arrays;
027import java.util.Random;
028import java.util.concurrent.ThreadLocalRandom;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.fs.HFileSystem;
036import org.apache.hadoop.hbase.io.hfile.CacheConfig;
037import org.apache.hadoop.hbase.io.hfile.HFile;
038import org.apache.hadoop.hbase.io.hfile.HFileContext;
039import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
040import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
041import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
042import org.apache.hadoop.hbase.testclassification.IOTests;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.rules.TestName;
049import org.junit.runner.RunWith;
050import org.junit.runners.Parameterized;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054@RunWith(Parameterized.class)
055@Category({ IOTests.class, LargeTests.class })
056public class TestPrefetchPersistence {
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestPrefetchPersistence.class);
060
061  public TestName name = new TestName();
062
063  @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
064  @SuppressWarnings("checkstyle:Indentation")
065  public static Iterable<Object[]> data() {
066    return Arrays.asList(new Object[][] { { 16 * 1024,
067      new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
068        28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
069        128 * 1024 + 1024 } } });
070  }
071
072  @Parameterized.Parameter(0)
073  public int constructedBlockSize;
074
075  @Parameterized.Parameter(1)
076  public int[] constructedBlockSizes;
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetchPersistence.class);
079
080  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
081
082  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
083  private static final int DATA_BLOCK_SIZE = 2048;
084  private static final int NUM_KV = 1000;
085
086  private Configuration conf;
087  private CacheConfig cacheConf;
088  private FileSystem fs;
089  String prefetchPersistencePath;
090  Path testDir;
091
092  BucketCache bucketCache;
093
094  final long capacitySize = 32 * 1024 * 1024;
095  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
096  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
097
098  @Before
099  public void setup() throws IOException {
100    conf = TEST_UTIL.getConfiguration();
101    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
102    testDir = TEST_UTIL.getDataTestDir();
103    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
104    fs = HFileSystem.get(conf);
105  }
106
107  @Test
108  public void testPrefetchPersistence() throws Exception {
109    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
110      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
111      testDir + "/bucket.persistence", 60 * 1000, conf);
112    bucketCache.waitForCacheInitialization(10000);
113    cacheConf = new CacheConfig(conf, bucketCache);
114
115    long usedSize = bucketCache.getAllocator().getUsedSize();
116    assertEquals(0, usedSize);
117    assertTrue(new File(testDir + "/bucket.cache").exists());
118    // Load Cache
119    Path storeFile = writeStoreFile("TestPrefetch0");
120    Path storeFile2 = writeStoreFile("TestPrefetch1");
121    readStoreFile(storeFile);
122    readStoreFile(storeFile2);
123    usedSize = bucketCache.getAllocator().getUsedSize();
124    assertNotEquals(0, usedSize);
125
126    bucketCache.shutdown();
127    assertTrue(new File(testDir + "/bucket.persistence").exists());
128    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
129      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
130      testDir + "/bucket.persistence", 60 * 1000, conf);
131    bucketCache.waitForCacheInitialization(10000);
132    cacheConf = new CacheConfig(conf, bucketCache);
133    assertTrue(usedSize != 0);
134    assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
135    assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName()));
136    TEST_UTIL.cleanupTestDir();
137  }
138
139  public void readStoreFile(Path storeFilePath) throws Exception {
140    // Open the file
141    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
142    while (!reader.prefetchComplete()) {
143      // Sleep for a bit
144      Thread.sleep(1000);
145    }
146  }
147
148  public Path writeStoreFile(String fname) throws IOException {
149    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
150    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
151    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
152      .withOutputDir(storeFileParentDir).withFileContext(meta).build();
153    Random rand = ThreadLocalRandom.current();
154    final int rowLen = 32;
155    for (int i = 0; i < NUM_KV; ++i) {
156      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
157      byte[] v = RandomKeyValueUtil.randomValue(rand);
158      int cfLen = rand.nextInt(k.length - rowLen + 1);
159      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
160        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
161      sfw.append(kv);
162    }
163
164    sfw.close();
165    return sfw.getPath();
166  }
167
168  public static KeyValue.Type generateKeyType(Random rand) {
169    if (rand.nextBoolean()) {
170      // Let's make half of KVs puts.
171      return KeyValue.Type.Put;
172    } else {
173      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
174      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
175        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
176          + "Probably the layout of KeyValue.Type has changed.");
177      }
178      return keyType;
179    }
180  }
181
182}