001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; 023import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY; 024import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION; 025import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE; 026import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION; 027import static org.hamcrest.MatcherAssert.assertThat; 028import static org.hamcrest.Matchers.allOf; 029import static org.hamcrest.Matchers.hasItem; 030import static org.hamcrest.Matchers.hasItems; 031import static org.hamcrest.Matchers.not; 032import static org.junit.Assert.assertEquals; 033import static org.junit.Assert.assertFalse; 034import static org.junit.Assert.assertTrue; 035import static org.junit.Assert.fail; 036 037import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 038import io.opentelemetry.sdk.trace.data.SpanData; 039import java.io.IOException; 040import java.util.List; 041import java.util.Random; 042import java.util.concurrent.ScheduledThreadPoolExecutor; 043import java.util.concurrent.ThreadLocalRandom; 044import java.util.concurrent.TimeUnit; 045import java.util.function.BiConsumer; 046import java.util.function.BiFunction; 047import java.util.function.Consumer; 048import org.apache.commons.lang3.mutable.MutableInt; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtil; 055import org.apache.hadoop.hbase.KeyValue; 056import org.apache.hadoop.hbase.MatcherPredicate; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.Waiter; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 061import org.apache.hadoop.hbase.client.RegionInfo; 062import org.apache.hadoop.hbase.client.RegionInfoBuilder; 063import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 064import org.apache.hadoop.hbase.fs.HFileSystem; 065import org.apache.hadoop.hbase.io.ByteBuffAllocator; 066import org.apache.hadoop.hbase.io.HFileLink; 067import org.apache.hadoop.hbase.io.compress.Compression; 068import org.apache.hadoop.hbase.regionserver.BloomType; 069import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 070import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 071import org.apache.hadoop.hbase.regionserver.HStoreFile; 072import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier; 073import org.apache.hadoop.hbase.regionserver.StoreContext; 074import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 075import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 076import org.apache.hadoop.hbase.regionserver.TestHStoreFile; 077import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 078import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 079import org.apache.hadoop.hbase.testclassification.IOTests; 080import org.apache.hadoop.hbase.testclassification.MediumTests; 081import org.apache.hadoop.hbase.trace.TraceUtil; 082import org.apache.hadoop.hbase.util.Bytes; 083import org.apache.hadoop.hbase.util.CommonFSUtils; 084import org.apache.hadoop.hbase.util.Pair; 085import org.junit.Before; 086import org.junit.ClassRule; 087import org.junit.Rule; 088import org.junit.Test; 089import org.junit.experimental.categories.Category; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093@Category({ IOTests.class, MediumTests.class }) 094public class TestPrefetch { 095 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); 096 097 @ClassRule 098 public static final HBaseClassTestRule CLASS_RULE = 099 HBaseClassTestRule.forClass(TestPrefetch.class); 100 101 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 102 103 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 104 private static final int DATA_BLOCK_SIZE = 2048; 105 private static final int NUM_KV = 1000; 106 private Configuration conf; 107 private CacheConfig cacheConf; 108 private FileSystem fs; 109 private BlockCache blockCache; 110 111 @Rule 112 public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); 113 114 @Before 115 public void setUp() throws IOException, InterruptedException { 116 conf = TEST_UTIL.getConfiguration(); 117 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 118 fs = HFileSystem.get(conf); 119 blockCache = BlockCacheFactory.createBlockCache(conf); 120 cacheConf = new CacheConfig(conf, blockCache); 121 } 122 123 @Test 124 public void testPrefetchSetInHCDWorks() { 125 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 126 .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build(); 127 Configuration c = HBaseConfiguration.create(); 128 assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false)); 129 CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 130 assertTrue(cc.shouldPrefetchOnOpen()); 131 } 132 133 @Test 134 public void testPrefetchBlockCacheDisabled() throws Exception { 135 ScheduledThreadPoolExecutor poolExecutor = 136 (ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool(); 137 long totalCompletedBefore = poolExecutor.getCompletedTaskCount(); 138 long queueBefore = poolExecutor.getQueue().size(); 139 ColumnFamilyDescriptor columnFamilyDescriptor = 140 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) 141 .setBlockCacheEnabled(false).build(); 142 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 143 CacheConfig cacheConfig = 144 new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 145 Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig); 146 readStoreFile(storeFile, (r, o) -> { 147 HFileBlock block = null; 148 try { 149 block = r.readBlock(o, -1, false, true, false, true, null, null); 150 } catch (IOException e) { 151 fail(e.getMessage()); 152 } 153 return block; 154 }, (key, block) -> { 155 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 156 if ( 157 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 158 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 159 ) { 160 assertFalse(isCached); 161 } 162 }, cacheConfig); 163 assertEquals(totalCompletedBefore + queueBefore, 164 poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size()); 165 } 166 167 @Test 168 public void testPrefetchHeapUsageAboveThreshold() throws Exception { 169 ColumnFamilyDescriptor columnFamilyDescriptor = 170 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) 171 .setBlockCacheEnabled(true).build(); 172 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 173 Configuration newConf = new Configuration(conf); 174 newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1); 175 CacheConfig cacheConfig = 176 new CacheConfig(newConf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 177 Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", meta, cacheConfig); 178 MutableInt cachedCount = new MutableInt(0); 179 MutableInt unCachedCount = new MutableInt(0); 180 readStoreFile(storeFile, (r, o) -> { 181 HFileBlock block = null; 182 try { 183 block = r.readBlock(o, -1, false, true, false, true, null, null); 184 } catch (IOException e) { 185 fail(e.getMessage()); 186 } 187 return block; 188 }, (key, block) -> { 189 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 190 if ( 191 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 192 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 193 ) { 194 if (isCached) { 195 cachedCount.increment(); 196 } else { 197 unCachedCount.increment(); 198 } 199 } 200 }, cacheConfig); 201 assertTrue(unCachedCount.compareTo(cachedCount) > 0); 202 } 203 204 @Test 205 public void testPrefetch() throws Exception { 206 TraceUtil.trace(() -> { 207 Path storeFile = writeStoreFile("TestPrefetch"); 208 readStoreFile(storeFile); 209 }, "testPrefetch"); 210 211 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans, 212 hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request")))); 213 final List<SpanData> spans = otelRule.getSpans(); 214 if (LOG.isDebugEnabled()) { 215 StringTraceRenderer renderer = new StringTraceRenderer(spans); 216 renderer.render(LOG::debug); 217 } 218 219 final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst() 220 .orElseThrow(AssertionError::new); 221 assertThat("prefetch spans happen on their own threads, detached from file open.", spans, 222 hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan))))); 223 } 224 225 @Test 226 public void testPrefetchRace() throws Exception { 227 for (int i = 0; i < 10; i++) { 228 Path storeFile = writeStoreFile("TestPrefetchRace-" + i); 229 readStoreFileLikeScanner(storeFile); 230 } 231 } 232 233 /** 234 * Read a storefile in the same manner as a scanner -- using non-positional reads and without 235 * waiting for prefetch to complete. 236 */ 237 private void readStoreFileLikeScanner(Path storeFilePath) throws Exception { 238 // Open the file 239 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 240 do { 241 long offset = 0; 242 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 243 HFileBlock block = 244 reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null); 245 offset += block.getOnDiskSizeWithHeader(); 246 } 247 } while (!reader.prefetchComplete()); 248 } 249 250 private void readStoreFile(Path storeFilePath) throws Exception { 251 readStoreFile(storeFilePath, (r, o) -> { 252 HFileBlock block = null; 253 try { 254 block = r.readBlock(o, -1, false, true, false, true, null, null); 255 } catch (IOException e) { 256 fail(e.getMessage()); 257 } 258 return block; 259 }, (key, block) -> { 260 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 261 if ( 262 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 263 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 264 ) { 265 assertTrue(isCached); 266 } 267 }); 268 } 269 270 private void readStoreFileCacheOnly(Path storeFilePath) throws Exception { 271 readStoreFile(storeFilePath, (r, o) -> { 272 HFileBlock block = null; 273 try { 274 block = r.readBlock(o, -1, false, true, false, true, null, null, true); 275 } catch (IOException e) { 276 fail(e.getMessage()); 277 } 278 return block; 279 }, (key, block) -> { 280 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 281 if (block.getBlockType() == BlockType.DATA) { 282 assertFalse(block.isUnpacked()); 283 } else if ( 284 block.getBlockType() == BlockType.ROOT_INDEX 285 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 286 ) { 287 assertTrue(block.isUnpacked()); 288 } 289 assertTrue(isCached); 290 }); 291 } 292 293 private void readStoreFile(Path storeFilePath, 294 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 295 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 296 readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf); 297 } 298 299 private void readStoreFile(Path storeFilePath, 300 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 301 BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig) 302 throws Exception { 303 // Open the file 304 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 305 306 while (!reader.prefetchComplete()) { 307 // Sleep for a bit 308 Thread.sleep(1000); 309 } 310 long offset = 0; 311 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 312 HFileBlock block = readFunction.apply(reader, offset); 313 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 314 validationFunction.accept(blockCacheKey, block); 315 offset += block.getOnDiskSizeWithHeader(); 316 } 317 } 318 319 @Test 320 public void testPrefetchCompressed() throws Exception { 321 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 322 cacheConf = new CacheConfig(conf, blockCache); 323 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 324 .withBlockSize(DATA_BLOCK_SIZE).build(); 325 Path storeFile = writeStoreFile("TestPrefetchCompressed", context); 326 readStoreFileCacheOnly(storeFile); 327 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 328 } 329 330 @Test 331 public void testPrefetchDoesntSkipRefs() throws Exception { 332 testPrefetchWhenRefs(false, c -> { 333 boolean isCached = c != null; 334 assertTrue(isCached); 335 }); 336 } 337 338 @Test 339 public void testOnConfigurationChange() { 340 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 341 conf.setInt(PREFETCH_DELAY, 40000); 342 prefetchExecutorNotifier.onConfigurationChange(conf); 343 assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000); 344 345 // restore 346 conf.setInt(PREFETCH_DELAY, 30000); 347 prefetchExecutorNotifier.onConfigurationChange(conf); 348 assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000); 349 350 conf.setInt(PREFETCH_DELAY, 1000); 351 prefetchExecutorNotifier.onConfigurationChange(conf); 352 } 353 354 @Test 355 public void testPrefetchWithDelay() throws Exception { 356 // Configure custom delay 357 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 358 conf.setInt(PREFETCH_DELAY, 25000); 359 conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f); 360 prefetchExecutorNotifier.onConfigurationChange(conf); 361 362 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 363 .withBlockSize(DATA_BLOCK_SIZE).build(); 364 Path storeFile = writeStoreFile("TestPrefetchWithDelay", context); 365 long startTime = System.currentTimeMillis(); 366 HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf); 367 368 // Wait for 20 seconds, no thread should start prefetch 369 Thread.sleep(20000); 370 assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted()); 371 long timeout = 10000; 372 Waiter.waitFor(conf, 10000, () -> (reader.prefetchStarted() || reader.prefetchComplete())); 373 374 assertTrue(reader.prefetchStarted() || reader.prefetchComplete()); 375 376 assertTrue("Prefetch should start post configured delay", 377 getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); 378 379 conf.setInt(PREFETCH_DELAY, 1000); 380 conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 381 prefetchExecutorNotifier.onConfigurationChange(conf); 382 } 383 384 @Test 385 public void testPrefetchWhenNoBlockCache() throws Exception { 386 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 387 try { 388 // Set a delay to max, as we don't need to have the thread running, but rather 389 // assert that it never gets scheduled 390 conf.setInt(PREFETCH_DELAY, Integer.MAX_VALUE); 391 conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f); 392 prefetchExecutorNotifier.onConfigurationChange(conf); 393 394 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 395 .withBlockSize(DATA_BLOCK_SIZE).build(); 396 Path storeFile = writeStoreFile("testPrefetchWhenNoBlockCache", context); 397 HFile.createReader(fs, storeFile, new CacheConfig(conf), true, conf); 398 assertEquals(0, PrefetchExecutor.getPrefetchFutures().size()); 399 } finally { 400 conf.setInt(PREFETCH_DELAY, 1000); 401 conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 402 prefetchExecutorNotifier.onConfigurationChange(conf); 403 } 404 } 405 406 @Test 407 public void testPrefetchDoesntSkipHFileLink() throws Exception { 408 testPrefetchWhenHFileLink(c -> { 409 boolean isCached = c != null; 410 assertTrue(isCached); 411 }); 412 } 413 414 private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test) 415 throws Exception { 416 cacheConf = new CacheConfig(conf, blockCache); 417 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 418 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs"); 419 RegionInfo region = 420 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build(); 421 Path regionDir = new Path(tableDir, region.getEncodedName()); 422 Pair<Path, byte[]> fileWithSplitPoint = 423 writeStoreFileForSplit(new Path(regionDir, "cf"), context); 424 Path storeFile = fileWithSplitPoint.getFirst(); 425 HRegionFileSystem regionFS = 426 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 427 StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, 428 StoreContext.getBuilder().withFamilyStoreDirectoryPath(new Path(regionDir, "cf")) 429 .withRegionFileSystem(regionFS).build()); 430 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true, sft); 431 Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false, 432 new ConstantSizeRegionSplitPolicy(), sft); 433 conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled); 434 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true, sft); 435 refHsf.initReader(); 436 HFile.Reader reader = refHsf.getReader().getHFileReader(); 437 while (!reader.prefetchComplete()) { 438 // Sleep for a bit 439 Thread.sleep(1000); 440 } 441 long offset = 0; 442 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 443 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 444 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 445 if (block.getBlockType() == BlockType.DATA) { 446 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 447 } 448 offset += block.getOnDiskSizeWithHeader(); 449 } 450 } 451 452 private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception { 453 cacheConf = new CacheConfig(conf, blockCache); 454 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 455 Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink"); 456 final RegionInfo hri = 457 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build(); 458 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 459 Configuration testConf = new Configuration(this.conf); 460 CommonFSUtils.setRootDir(testConf, testDir); 461 HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 462 CommonFSUtils.getTableDir(testDir, hri.getTable()), hri); 463 464 // Make a store file and write data to it. 465 StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs) 466 .withFilePath(regionFs.createTempName()).withFileContext(context).build(); 467 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"), 468 Bytes.toBytes("testPrefetchWhenHFileLink")); 469 470 Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath()); 471 final RegionInfo dstHri = 472 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build(); 473 HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 474 CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri); 475 Path dstPath = new Path(regionFs.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf")); 476 HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName()); 477 Path linkFilePath = 478 new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName())); 479 480 StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false, 481 StoreContext.getBuilder() 482 .withFamilyStoreDirectoryPath(new Path(dstRegionFs.getRegionDir(), "cf")) 483 .withRegionFileSystem(dstRegionFs).build()); 484 // Try to open store file from link 485 StoreFileInfo storeFileInfo = sft.getStoreFileInfo(linkFilePath, true); 486 HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 487 assertTrue(storeFileInfo.isLink()); 488 489 hsf.initReader(); 490 HFile.Reader reader = hsf.getReader().getHFileReader(); 491 while (!reader.prefetchComplete()) { 492 // Sleep for a bit 493 Thread.sleep(1000); 494 } 495 long offset = 0; 496 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 497 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 498 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 499 if (block.getBlockType() == BlockType.DATA) { 500 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 501 } 502 offset += block.getOnDiskSizeWithHeader(); 503 } 504 } 505 506 private Path writeStoreFile(String fname) throws IOException { 507 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 508 return writeStoreFile(fname, meta); 509 } 510 511 private Path writeStoreFile(String fname, HFileContext context) throws IOException { 512 return writeStoreFile(fname, context, cacheConf); 513 } 514 515 private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig) 516 throws IOException { 517 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 518 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs) 519 .withOutputDir(storeFileParentDir).withFileContext(context).build(); 520 Random rand = ThreadLocalRandom.current(); 521 final int rowLen = 32; 522 for (int i = 0; i < NUM_KV; ++i) { 523 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 524 byte[] v = RandomKeyValueUtil.randomValue(rand); 525 int cfLen = rand.nextInt(k.length - rowLen + 1); 526 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 527 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 528 sfw.append(kv); 529 } 530 531 sfw.close(); 532 return sfw.getPath(); 533 } 534 535 private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context) 536 throws IOException { 537 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir) 538 .withFileContext(context).build(); 539 Random rand = ThreadLocalRandom.current(); 540 final int rowLen = 32; 541 byte[] splitPoint = null; 542 for (int i = 0; i < NUM_KV; ++i) { 543 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 544 byte[] v = RandomKeyValueUtil.randomValue(rand); 545 int cfLen = rand.nextInt(k.length - rowLen + 1); 546 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 547 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 548 sfw.append(kv); 549 if (i == NUM_KV / 2) { 550 splitPoint = k; 551 } 552 } 553 sfw.close(); 554 return new Pair(sfw.getPath(), splitPoint); 555 } 556 557 public static KeyValue.Type generateKeyType(Random rand) { 558 if (rand.nextBoolean()) { 559 // Let's make half of KVs puts. 560 return KeyValue.Type.Put; 561 } else { 562 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 563 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 564 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 565 + "Probably the layout of KeyValue.Type has changed."); 566 } 567 return keyType; 568 } 569 } 570 571 private long getElapsedTime(long startTime) { 572 return System.currentTimeMillis() - startTime; 573 } 574}