001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; 023import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY; 024import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION; 025import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE; 026import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION; 027import static org.hamcrest.MatcherAssert.assertThat; 028import static org.hamcrest.Matchers.allOf; 029import static org.hamcrest.Matchers.hasItem; 030import static org.hamcrest.Matchers.hasItems; 031import static org.hamcrest.Matchers.not; 032import static org.junit.Assert.assertEquals; 033import static org.junit.Assert.assertFalse; 034import static org.junit.Assert.assertTrue; 035import static org.junit.Assert.fail; 036 037import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 038import io.opentelemetry.sdk.trace.data.SpanData; 039import java.io.IOException; 040import java.util.List; 041import java.util.Random; 042import java.util.concurrent.ScheduledThreadPoolExecutor; 043import java.util.concurrent.ThreadLocalRandom; 044import java.util.concurrent.TimeUnit; 045import java.util.function.BiConsumer; 046import java.util.function.BiFunction; 047import java.util.function.Consumer; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.KeyValue; 055import org.apache.hadoop.hbase.MatcherPredicate; 056import org.apache.hadoop.hbase.TableName; 057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.RegionInfoBuilder; 061import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 062import org.apache.hadoop.hbase.fs.HFileSystem; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator; 064import org.apache.hadoop.hbase.io.HFileLink; 065import org.apache.hadoop.hbase.io.compress.Compression; 066import org.apache.hadoop.hbase.regionserver.BloomType; 067import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; 068import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 069import org.apache.hadoop.hbase.regionserver.HStoreFile; 070import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier; 071import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 072import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 073import org.apache.hadoop.hbase.regionserver.TestHStoreFile; 074import org.apache.hadoop.hbase.testclassification.IOTests; 075import org.apache.hadoop.hbase.testclassification.MediumTests; 076import org.apache.hadoop.hbase.trace.TraceUtil; 077import org.apache.hadoop.hbase.util.Bytes; 078import org.apache.hadoop.hbase.util.CommonFSUtils; 079import org.apache.hadoop.hbase.util.Pair; 080import org.junit.Before; 081import org.junit.ClassRule; 082import org.junit.Rule; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088@Category({ IOTests.class, MediumTests.class }) 089public class TestPrefetch { 090 private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); 091 092 @ClassRule 093 public static final HBaseClassTestRule CLASS_RULE = 094 HBaseClassTestRule.forClass(TestPrefetch.class); 095 096 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 097 098 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 099 private static final int DATA_BLOCK_SIZE = 2048; 100 private static final int NUM_KV = 1000; 101 private Configuration conf; 102 private CacheConfig cacheConf; 103 private FileSystem fs; 104 private BlockCache blockCache; 105 106 @Rule 107 public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); 108 109 @Before 110 public void setUp() throws IOException { 111 conf = TEST_UTIL.getConfiguration(); 112 conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); 113 fs = HFileSystem.get(conf); 114 blockCache = BlockCacheFactory.createBlockCache(conf); 115 cacheConf = new CacheConfig(conf, blockCache); 116 } 117 118 @Test 119 public void testPrefetchSetInHCDWorks() { 120 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 121 .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build(); 122 Configuration c = HBaseConfiguration.create(); 123 assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false)); 124 CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 125 assertTrue(cc.shouldPrefetchOnOpen()); 126 } 127 128 @Test 129 public void testPrefetchBlockCacheDisabled() throws Exception { 130 ScheduledThreadPoolExecutor poolExecutor = 131 (ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool(); 132 long totalCompletedBefore = poolExecutor.getCompletedTaskCount(); 133 long queueBefore = poolExecutor.getQueue().size(); 134 ColumnFamilyDescriptor columnFamilyDescriptor = 135 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true) 136 .setBlockCacheEnabled(false).build(); 137 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 138 CacheConfig cacheConfig = 139 new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP); 140 Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig); 141 readStoreFile(storeFile, (r, o) -> { 142 HFileBlock block = null; 143 try { 144 block = r.readBlock(o, -1, false, true, false, true, null, null); 145 } catch (IOException e) { 146 fail(e.getMessage()); 147 } 148 return block; 149 }, (key, block) -> { 150 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 151 if ( 152 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 153 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 154 ) { 155 assertFalse(isCached); 156 } 157 }, cacheConfig); 158 assertEquals(totalCompletedBefore + queueBefore, 159 poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size()); 160 } 161 162 @Test 163 public void testPrefetch() throws Exception { 164 TraceUtil.trace(() -> { 165 Path storeFile = writeStoreFile("TestPrefetch"); 166 readStoreFile(storeFile); 167 }, "testPrefetch"); 168 169 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans, 170 hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request")))); 171 final List<SpanData> spans = otelRule.getSpans(); 172 if (LOG.isDebugEnabled()) { 173 StringTraceRenderer renderer = new StringTraceRenderer(spans); 174 renderer.render(LOG::debug); 175 } 176 177 final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst() 178 .orElseThrow(AssertionError::new); 179 assertThat("prefetch spans happen on their own threads, detached from file open.", spans, 180 hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan))))); 181 } 182 183 @Test 184 public void testPrefetchRace() throws Exception { 185 for (int i = 0; i < 10; i++) { 186 Path storeFile = writeStoreFile("TestPrefetchRace-" + i); 187 readStoreFileLikeScanner(storeFile); 188 } 189 } 190 191 /** 192 * Read a storefile in the same manner as a scanner -- using non-positional reads and without 193 * waiting for prefetch to complete. 194 */ 195 private void readStoreFileLikeScanner(Path storeFilePath) throws Exception { 196 // Open the file 197 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 198 do { 199 long offset = 0; 200 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 201 HFileBlock block = 202 reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null); 203 offset += block.getOnDiskSizeWithHeader(); 204 } 205 } while (!reader.prefetchComplete()); 206 } 207 208 private void readStoreFile(Path storeFilePath) throws Exception { 209 readStoreFile(storeFilePath, (r, o) -> { 210 HFileBlock block = null; 211 try { 212 block = r.readBlock(o, -1, false, true, false, true, null, null); 213 } catch (IOException e) { 214 fail(e.getMessage()); 215 } 216 return block; 217 }, (key, block) -> { 218 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 219 if ( 220 block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX 221 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 222 ) { 223 assertTrue(isCached); 224 } 225 }); 226 } 227 228 private void readStoreFileCacheOnly(Path storeFilePath) throws Exception { 229 readStoreFile(storeFilePath, (r, o) -> { 230 HFileBlock block = null; 231 try { 232 block = r.readBlock(o, -1, false, true, false, true, null, null, true); 233 } catch (IOException e) { 234 fail(e.getMessage()); 235 } 236 return block; 237 }, (key, block) -> { 238 boolean isCached = blockCache.getBlock(key, true, false, true) != null; 239 if (block.getBlockType() == BlockType.DATA) { 240 assertFalse(block.isUnpacked()); 241 } else if ( 242 block.getBlockType() == BlockType.ROOT_INDEX 243 || block.getBlockType() == BlockType.INTERMEDIATE_INDEX 244 ) { 245 assertTrue(block.isUnpacked()); 246 } 247 assertTrue(isCached); 248 }); 249 } 250 251 private void readStoreFile(Path storeFilePath, 252 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 253 BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception { 254 readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf); 255 } 256 257 private void readStoreFile(Path storeFilePath, 258 BiFunction<HFile.Reader, Long, HFileBlock> readFunction, 259 BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig) 260 throws Exception { 261 // Open the file 262 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); 263 264 while (!reader.prefetchComplete()) { 265 // Sleep for a bit 266 Thread.sleep(1000); 267 } 268 long offset = 0; 269 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 270 HFileBlock block = readFunction.apply(reader, offset); 271 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 272 validationFunction.accept(blockCacheKey, block); 273 offset += block.getOnDiskSizeWithHeader(); 274 } 275 } 276 277 @Test 278 public void testPrefetchCompressed() throws Exception { 279 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 280 cacheConf = new CacheConfig(conf, blockCache); 281 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 282 .withBlockSize(DATA_BLOCK_SIZE).build(); 283 Path storeFile = writeStoreFile("TestPrefetchCompressed", context); 284 readStoreFileCacheOnly(storeFile); 285 conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 286 } 287 288 @Test 289 public void testPrefetchSkipsRefs() throws Exception { 290 testPrefetchWhenRefs(true, c -> { 291 boolean isCached = c != null; 292 assertFalse(isCached); 293 }); 294 } 295 296 @Test 297 public void testPrefetchDoesntSkipRefs() throws Exception { 298 testPrefetchWhenRefs(false, c -> { 299 boolean isCached = c != null; 300 assertTrue(isCached); 301 }); 302 } 303 304 @Test 305 public void testOnConfigurationChange() { 306 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 307 conf.setInt(PREFETCH_DELAY, 40000); 308 prefetchExecutorNotifier.onConfigurationChange(conf); 309 assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000); 310 311 // restore 312 conf.setInt(PREFETCH_DELAY, 30000); 313 prefetchExecutorNotifier.onConfigurationChange(conf); 314 assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000); 315 316 conf.setInt(PREFETCH_DELAY, 1000); 317 prefetchExecutorNotifier.onConfigurationChange(conf); 318 } 319 320 @Test 321 public void testPrefetchWithDelay() throws Exception { 322 // Configure custom delay 323 PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 324 conf.setInt(PREFETCH_DELAY, 25000); 325 conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f); 326 prefetchExecutorNotifier.onConfigurationChange(conf); 327 328 HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) 329 .withBlockSize(DATA_BLOCK_SIZE).build(); 330 Path storeFile = writeStoreFile("TestPrefetchWithDelay", context); 331 HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf); 332 long startTime = System.currentTimeMillis(); 333 334 // Wait for 20 seconds, no thread should start prefetch 335 Thread.sleep(20000); 336 assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted()); 337 while (!reader.prefetchStarted()) { 338 assertTrue("Prefetch delay has not been expired yet", 339 getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay()); 340 } 341 if (reader.prefetchStarted()) { 342 // Added some delay as we have started the timer a bit late. 343 Thread.sleep(500); 344 assertTrue("Prefetch should start post configured delay", 345 getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); 346 } 347 conf.setInt(PREFETCH_DELAY, 1000); 348 conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 349 prefetchExecutorNotifier.onConfigurationChange(conf); 350 } 351 352 @Test 353 public void testPrefetchDoesntSkipHFileLink() throws Exception { 354 testPrefetchWhenHFileLink(c -> { 355 boolean isCached = c != null; 356 assertTrue(isCached); 357 }); 358 } 359 360 private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test) 361 throws Exception { 362 cacheConf = new CacheConfig(conf, blockCache); 363 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 364 Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs"); 365 RegionInfo region = 366 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build(); 367 Path regionDir = new Path(tableDir, region.getEncodedName()); 368 Pair<Path, byte[]> fileWithSplitPoint = 369 writeStoreFileForSplit(new Path(regionDir, "cf"), context); 370 Path storeFile = fileWithSplitPoint.getFirst(); 371 HRegionFileSystem regionFS = 372 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); 373 HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true); 374 Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false, 375 new ConstantSizeRegionSplitPolicy()); 376 conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled); 377 HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true); 378 refHsf.initReader(); 379 HFile.Reader reader = refHsf.getReader().getHFileReader(); 380 while (!reader.prefetchComplete()) { 381 // Sleep for a bit 382 Thread.sleep(1000); 383 } 384 long offset = 0; 385 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 386 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 387 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 388 if (block.getBlockType() == BlockType.DATA) { 389 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 390 } 391 offset += block.getOnDiskSizeWithHeader(); 392 } 393 } 394 395 private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception { 396 cacheConf = new CacheConfig(conf, blockCache); 397 HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 398 Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink"); 399 final RegionInfo hri = 400 RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build(); 401 // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/ 402 Configuration testConf = new Configuration(this.conf); 403 CommonFSUtils.setRootDir(testConf, testDir); 404 HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, 405 CommonFSUtils.getTableDir(testDir, hri.getTable()), hri); 406 407 // Make a store file and write data to it. 408 StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs) 409 .withFilePath(regionFs.createTempName()).withFileContext(context).build(); 410 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"), 411 Bytes.toBytes("testPrefetchWhenHFileLink")); 412 413 Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath()); 414 Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf")); 415 HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName()); 416 Path linkFilePath = 417 new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName())); 418 419 // Try to open store file from link 420 StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true); 421 HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 422 assertTrue(storeFileInfo.isLink()); 423 424 hsf.initReader(); 425 HFile.Reader reader = hsf.getReader().getHFileReader(); 426 while (!reader.prefetchComplete()) { 427 // Sleep for a bit 428 Thread.sleep(1000); 429 } 430 long offset = 0; 431 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 432 HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true); 433 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 434 if (block.getBlockType() == BlockType.DATA) { 435 test.accept(blockCache.getBlock(blockCacheKey, true, false, true)); 436 } 437 offset += block.getOnDiskSizeWithHeader(); 438 } 439 } 440 441 private Path writeStoreFile(String fname) throws IOException { 442 HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); 443 return writeStoreFile(fname, meta); 444 } 445 446 private Path writeStoreFile(String fname, HFileContext context) throws IOException { 447 return writeStoreFile(fname, context, cacheConf); 448 } 449 450 private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig) 451 throws IOException { 452 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); 453 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs) 454 .withOutputDir(storeFileParentDir).withFileContext(context).build(); 455 Random rand = ThreadLocalRandom.current(); 456 final int rowLen = 32; 457 for (int i = 0; i < NUM_KV; ++i) { 458 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 459 byte[] v = RandomKeyValueUtil.randomValue(rand); 460 int cfLen = rand.nextInt(k.length - rowLen + 1); 461 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 462 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 463 sfw.append(kv); 464 } 465 466 sfw.close(); 467 return sfw.getPath(); 468 } 469 470 private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context) 471 throws IOException { 472 StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir) 473 .withFileContext(context).build(); 474 Random rand = ThreadLocalRandom.current(); 475 final int rowLen = 32; 476 byte[] splitPoint = null; 477 for (int i = 0; i < NUM_KV; ++i) { 478 byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); 479 byte[] v = RandomKeyValueUtil.randomValue(rand); 480 int cfLen = rand.nextInt(k.length - rowLen + 1); 481 KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, 482 k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length); 483 sfw.append(kv); 484 if (i == NUM_KV / 2) { 485 splitPoint = k; 486 } 487 } 488 sfw.close(); 489 return new Pair(sfw.getPath(), splitPoint); 490 } 491 492 public static KeyValue.Type generateKeyType(Random rand) { 493 if (rand.nextBoolean()) { 494 // Let's make half of KVs puts. 495 return KeyValue.Type.Put; 496 } else { 497 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 498 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 499 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 500 + "Probably the layout of KeyValue.Type has changed."); 501 } 502 return keyType; 503 } 504 } 505 506 private long getElapsedTime(long startTime) { 507 return System.currentTimeMillis() - startTime; 508 } 509}