001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
023import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY;
024import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION;
025import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE;
026import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
027import static org.hamcrest.MatcherAssert.assertThat;
028import static org.hamcrest.Matchers.allOf;
029import static org.hamcrest.Matchers.hasItem;
030import static org.hamcrest.Matchers.hasItems;
031import static org.hamcrest.Matchers.not;
032import static org.junit.Assert.assertEquals;
033import static org.junit.Assert.assertFalse;
034import static org.junit.Assert.assertTrue;
035import static org.junit.Assert.fail;
036
037import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
038import io.opentelemetry.sdk.trace.data.SpanData;
039import java.io.IOException;
040import java.util.List;
041import java.util.Random;
042import java.util.concurrent.ScheduledThreadPoolExecutor;
043import java.util.concurrent.ThreadLocalRandom;
044import java.util.concurrent.TimeUnit;
045import java.util.function.BiConsumer;
046import java.util.function.BiFunction;
047import java.util.function.Consumer;
048import org.apache.commons.lang3.mutable.MutableInt;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtil;
055import org.apache.hadoop.hbase.KeyValue;
056import org.apache.hadoop.hbase.MatcherPredicate;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.Waiter;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
061import org.apache.hadoop.hbase.client.RegionInfo;
062import org.apache.hadoop.hbase.client.RegionInfoBuilder;
063import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
064import org.apache.hadoop.hbase.fs.HFileSystem;
065import org.apache.hadoop.hbase.io.ByteBuffAllocator;
066import org.apache.hadoop.hbase.io.HFileLink;
067import org.apache.hadoop.hbase.io.compress.Compression;
068import org.apache.hadoop.hbase.regionserver.BloomType;
069import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
070import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
071import org.apache.hadoop.hbase.regionserver.HStoreFile;
072import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier;
073import org.apache.hadoop.hbase.regionserver.StoreContext;
074import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
075import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
076import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
078import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
079import org.apache.hadoop.hbase.testclassification.IOTests;
080import org.apache.hadoop.hbase.testclassification.MediumTests;
081import org.apache.hadoop.hbase.trace.TraceUtil;
082import org.apache.hadoop.hbase.util.Bytes;
083import org.apache.hadoop.hbase.util.CommonFSUtils;
084import org.apache.hadoop.hbase.util.Pair;
085import org.junit.Before;
086import org.junit.ClassRule;
087import org.junit.Rule;
088import org.junit.Test;
089import org.junit.experimental.categories.Category;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093@Category({ IOTests.class, MediumTests.class })
094public class TestPrefetch {
095  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class);
096
097  @ClassRule
098  public static final HBaseClassTestRule CLASS_RULE =
099    HBaseClassTestRule.forClass(TestPrefetch.class);
100
101  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
102
103  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
104  private static final int DATA_BLOCK_SIZE = 2048;
105  private static final int NUM_KV = 1000;
106  private Configuration conf;
107  private CacheConfig cacheConf;
108  private FileSystem fs;
109  private BlockCache blockCache;
110
111  @Rule
112  public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
113
114  @Before
115  public void setUp() throws IOException, InterruptedException {
116    conf = TEST_UTIL.getConfiguration();
117    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
118    fs = HFileSystem.get(conf);
119    blockCache = BlockCacheFactory.createBlockCache(conf);
120    cacheConf = new CacheConfig(conf, blockCache);
121  }
122
123  @Test
124  public void testPrefetchSetInHCDWorks() {
125    ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
126      .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build();
127    Configuration c = HBaseConfiguration.create();
128    assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false));
129    CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
130    assertTrue(cc.shouldPrefetchOnOpen());
131  }
132
133  @Test
134  public void testPrefetchBlockCacheDisabled() throws Exception {
135    ScheduledThreadPoolExecutor poolExecutor =
136      (ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool();
137    long totalCompletedBefore = poolExecutor.getCompletedTaskCount();
138    long queueBefore = poolExecutor.getQueue().size();
139    ColumnFamilyDescriptor columnFamilyDescriptor =
140      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
141        .setBlockCacheEnabled(false).build();
142    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
143    CacheConfig cacheConfig =
144      new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
145    Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig);
146    readStoreFile(storeFile, (r, o) -> {
147      HFileBlock block = null;
148      try {
149        block = r.readBlock(o, -1, false, true, false, true, null, null);
150      } catch (IOException e) {
151        fail(e.getMessage());
152      }
153      return block;
154    }, (key, block) -> {
155      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
156      if (
157        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
158          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
159      ) {
160        assertFalse(isCached);
161      }
162    }, cacheConfig);
163    assertEquals(totalCompletedBefore + queueBefore,
164      poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size());
165  }
166
167  @Test
168  public void testPrefetchHeapUsageAboveThreshold() throws Exception {
169    ColumnFamilyDescriptor columnFamilyDescriptor =
170      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
171        .setBlockCacheEnabled(true).build();
172    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
173    Configuration newConf = new Configuration(conf);
174    newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1);
175    CacheConfig cacheConfig =
176      new CacheConfig(newConf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
177    Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", meta, cacheConfig);
178    MutableInt cachedCount = new MutableInt(0);
179    MutableInt unCachedCount = new MutableInt(0);
180    readStoreFile(storeFile, (r, o) -> {
181      HFileBlock block = null;
182      try {
183        block = r.readBlock(o, -1, false, true, false, true, null, null);
184      } catch (IOException e) {
185        fail(e.getMessage());
186      }
187      return block;
188    }, (key, block) -> {
189      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
190      if (
191        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
192          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
193      ) {
194        if (isCached) {
195          cachedCount.increment();
196        } else {
197          unCachedCount.increment();
198        }
199      }
200    }, cacheConfig);
201    assertTrue(unCachedCount.compareTo(cachedCount) > 0);
202  }
203
204  @Test
205  public void testPrefetch() throws Exception {
206    TraceUtil.trace(() -> {
207      Path storeFile = writeStoreFile("TestPrefetch");
208      readStoreFile(storeFile);
209    }, "testPrefetch");
210
211    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans,
212      hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request"))));
213    final List<SpanData> spans = otelRule.getSpans();
214    if (LOG.isDebugEnabled()) {
215      StringTraceRenderer renderer = new StringTraceRenderer(spans);
216      renderer.render(LOG::debug);
217    }
218
219    final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst()
220      .orElseThrow(AssertionError::new);
221    assertThat("prefetch spans happen on their own threads, detached from file open.", spans,
222      hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan)))));
223  }
224
225  @Test
226  public void testPrefetchRace() throws Exception {
227    for (int i = 0; i < 10; i++) {
228      Path storeFile = writeStoreFile("TestPrefetchRace-" + i);
229      readStoreFileLikeScanner(storeFile);
230    }
231  }
232
233  /**
234   * Read a storefile in the same manner as a scanner -- using non-positional reads and without
235   * waiting for prefetch to complete.
236   */
237  private void readStoreFileLikeScanner(Path storeFilePath) throws Exception {
238    // Open the file
239    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
240    do {
241      long offset = 0;
242      while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
243        HFileBlock block =
244          reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null);
245        offset += block.getOnDiskSizeWithHeader();
246      }
247    } while (!reader.prefetchComplete());
248  }
249
250  private void readStoreFile(Path storeFilePath) throws Exception {
251    readStoreFile(storeFilePath, (r, o) -> {
252      HFileBlock block = null;
253      try {
254        block = r.readBlock(o, -1, false, true, false, true, null, null);
255      } catch (IOException e) {
256        fail(e.getMessage());
257      }
258      return block;
259    }, (key, block) -> {
260      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
261      if (
262        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
263          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
264      ) {
265        assertTrue(isCached);
266      }
267    });
268  }
269
270  private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
271    readStoreFile(storeFilePath, (r, o) -> {
272      HFileBlock block = null;
273      try {
274        block = r.readBlock(o, -1, false, true, false, true, null, null, true);
275      } catch (IOException e) {
276        fail(e.getMessage());
277      }
278      return block;
279    }, (key, block) -> {
280      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
281      if (block.getBlockType() == BlockType.DATA) {
282        assertFalse(block.isUnpacked());
283      } else if (
284        block.getBlockType() == BlockType.ROOT_INDEX
285          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
286      ) {
287        assertTrue(block.isUnpacked());
288      }
289      assertTrue(isCached);
290    });
291  }
292
293  private void readStoreFile(Path storeFilePath,
294    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
295    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
296    readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf);
297  }
298
299  private void readStoreFile(Path storeFilePath,
300    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
301    BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig)
302    throws Exception {
303    // Open the file
304    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
305
306    while (!reader.prefetchComplete()) {
307      // Sleep for a bit
308      Thread.sleep(1000);
309    }
310    long offset = 0;
311    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
312      HFileBlock block = readFunction.apply(reader, offset);
313      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
314      validationFunction.accept(blockCacheKey, block);
315      offset += block.getOnDiskSizeWithHeader();
316    }
317  }
318
319  @Test
320  public void testPrefetchCompressed() throws Exception {
321    conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
322    cacheConf = new CacheConfig(conf, blockCache);
323    HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
324      .withBlockSize(DATA_BLOCK_SIZE).build();
325    Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
326    readStoreFileCacheOnly(storeFile);
327    conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
328  }
329
330  @Test
331  public void testPrefetchDoesntSkipRefs() throws Exception {
332    testPrefetchWhenRefs(false, c -> {
333      boolean isCached = c != null;
334      assertTrue(isCached);
335    });
336  }
337
338  @Test
339  public void testOnConfigurationChange() {
340    PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
341    conf.setInt(PREFETCH_DELAY, 40000);
342    prefetchExecutorNotifier.onConfigurationChange(conf);
343    assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000);
344
345    // restore
346    conf.setInt(PREFETCH_DELAY, 30000);
347    prefetchExecutorNotifier.onConfigurationChange(conf);
348    assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000);
349
350    conf.setInt(PREFETCH_DELAY, 1000);
351    prefetchExecutorNotifier.onConfigurationChange(conf);
352  }
353
354  @Test
355  public void testPrefetchWithDelay() throws Exception {
356    // Configure custom delay
357    PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
358    conf.setInt(PREFETCH_DELAY, 25000);
359    conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f);
360    prefetchExecutorNotifier.onConfigurationChange(conf);
361
362    HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
363      .withBlockSize(DATA_BLOCK_SIZE).build();
364    Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
365    long startTime = System.currentTimeMillis();
366    HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf);
367
368    // Wait for 20 seconds, no thread should start prefetch
369    Thread.sleep(20000);
370    assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
371    long timeout = 10000;
372    Waiter.waitFor(conf, 10000, () -> (reader.prefetchStarted() || reader.prefetchComplete()));
373
374    assertTrue(reader.prefetchStarted() || reader.prefetchComplete());
375
376    assertTrue("Prefetch should start post configured delay",
377      getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay());
378
379    conf.setInt(PREFETCH_DELAY, 1000);
380    conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
381    prefetchExecutorNotifier.onConfigurationChange(conf);
382  }
383
384  @Test
385  public void testPrefetchWhenNoBlockCache() throws Exception {
386    PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
387    try {
388      // Set a delay to max, as we don't need to have the thread running, but rather
389      // assert that it never gets scheduled
390      conf.setInt(PREFETCH_DELAY, Integer.MAX_VALUE);
391      conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f);
392      prefetchExecutorNotifier.onConfigurationChange(conf);
393
394      HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
395        .withBlockSize(DATA_BLOCK_SIZE).build();
396      Path storeFile = writeStoreFile("testPrefetchWhenNoBlockCache", context);
397      HFile.createReader(fs, storeFile, new CacheConfig(conf), true, conf);
398      assertEquals(0, PrefetchExecutor.getPrefetchFutures().size());
399    } finally {
400      conf.setInt(PREFETCH_DELAY, 1000);
401      conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
402      prefetchExecutorNotifier.onConfigurationChange(conf);
403    }
404  }
405
406  @Test
407  public void testPrefetchDoesntSkipHFileLink() throws Exception {
408    testPrefetchWhenHFileLink(c -> {
409      boolean isCached = c != null;
410      assertTrue(isCached);
411    });
412  }
413
414  private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
415    throws Exception {
416    cacheConf = new CacheConfig(conf, blockCache);
417    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
418    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs");
419    RegionInfo region =
420      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build();
421    Path regionDir = new Path(tableDir, region.getEncodedName());
422    Pair<Path, byte[]> fileWithSplitPoint =
423      writeStoreFileForSplit(new Path(regionDir, "cf"), context);
424    Path storeFile = fileWithSplitPoint.getFirst();
425    HRegionFileSystem regionFS =
426      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
427    StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,
428      StoreContext.getBuilder().withFamilyStoreDirectoryPath(new Path(regionDir, "cf"))
429        .withRegionFileSystem(regionFS).build());
430    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft);
431    Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false,
432      new ConstantSizeRegionSplitPolicy(), sft);
433    conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled);
434    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft);
435    refHsf.initReader();
436    HFile.Reader reader = refHsf.getReader().getHFileReader();
437    while (!reader.prefetchComplete()) {
438      // Sleep for a bit
439      Thread.sleep(1000);
440    }
441    long offset = 0;
442    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
443      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
444      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
445      if (block.getBlockType() == BlockType.DATA) {
446        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
447      }
448      offset += block.getOnDiskSizeWithHeader();
449    }
450  }
451
452  private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
453    cacheConf = new CacheConfig(conf, blockCache);
454    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
455    Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
456    final RegionInfo hri =
457      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
458    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
459    Configuration testConf = new Configuration(this.conf);
460    CommonFSUtils.setRootDir(testConf, testDir);
461    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
462      CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
463
464    // Make a store file and write data to it.
465    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
466      .withFilePath(regionFs.createTempName()).withFileContext(context).build();
467    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
468      Bytes.toBytes("testPrefetchWhenHFileLink"));
469
470    Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
471    final RegionInfo dstHri =
472      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
473    HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
474      CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri);
475    Path dstPath = new Path(regionFs.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf"));
476    StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
477      StoreContext.getBuilder()
478        .withFamilyStoreDirectoryPath(new Path(dstRegionFs.getRegionDir(), "cf"))
479        .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
480        .withRegionFileSystem(dstRegionFs).build());
481    sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFilePath.getName(), true);
482    Path linkFilePath =
483      new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
484
485    // Try to open store file from link
486    StoreFileInfo storeFileInfo = sft.getStoreFileInfo(linkFilePath, true);
487    HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
488    assertTrue(storeFileInfo.isLink());
489
490    hsf.initReader();
491    HFile.Reader reader = hsf.getReader().getHFileReader();
492    while (!reader.prefetchComplete()) {
493      // Sleep for a bit
494      Thread.sleep(1000);
495    }
496    long offset = 0;
497    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
498      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
499      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
500      if (block.getBlockType() == BlockType.DATA) {
501        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
502      }
503      offset += block.getOnDiskSizeWithHeader();
504    }
505  }
506
507  private Path writeStoreFile(String fname) throws IOException {
508    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
509    return writeStoreFile(fname, meta);
510  }
511
512  private Path writeStoreFile(String fname, HFileContext context) throws IOException {
513    return writeStoreFile(fname, context, cacheConf);
514  }
515
516  private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig)
517    throws IOException {
518    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
519    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs)
520      .withOutputDir(storeFileParentDir).withFileContext(context).build();
521    Random rand = ThreadLocalRandom.current();
522    final int rowLen = 32;
523    for (int i = 0; i < NUM_KV; ++i) {
524      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
525      byte[] v = RandomKeyValueUtil.randomValue(rand);
526      int cfLen = rand.nextInt(k.length - rowLen + 1);
527      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
528        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
529      sfw.append(kv);
530    }
531
532    sfw.close();
533    return sfw.getPath();
534  }
535
536  private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context)
537    throws IOException {
538    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir)
539      .withFileContext(context).build();
540    Random rand = ThreadLocalRandom.current();
541    final int rowLen = 32;
542    byte[] splitPoint = null;
543    for (int i = 0; i < NUM_KV; ++i) {
544      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
545      byte[] v = RandomKeyValueUtil.randomValue(rand);
546      int cfLen = rand.nextInt(k.length - rowLen + 1);
547      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
548        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
549      sfw.append(kv);
550      if (i == NUM_KV / 2) {
551        splitPoint = k;
552      }
553    }
554    sfw.close();
555    return new Pair(sfw.getPath(), splitPoint);
556  }
557
558  public static KeyValue.Type generateKeyType(Random rand) {
559    if (rand.nextBoolean()) {
560      // Let's make half of KVs puts.
561      return KeyValue.Type.Put;
562    } else {
563      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
564      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
565        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
566          + "Probably the layout of KeyValue.Type has changed.");
567      }
568      return keyType;
569    }
570  }
571
572  private long getElapsedTime(long startTime) {
573    return System.currentTimeMillis() - startTime;
574  }
575}