001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
022import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
023import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY;
024import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION;
025import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE;
026import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
027import static org.hamcrest.MatcherAssert.assertThat;
028import static org.hamcrest.Matchers.allOf;
029import static org.hamcrest.Matchers.hasItem;
030import static org.hamcrest.Matchers.hasItems;
031import static org.hamcrest.Matchers.not;
032import static org.junit.Assert.assertEquals;
033import static org.junit.Assert.assertFalse;
034import static org.junit.Assert.assertTrue;
035import static org.junit.Assert.fail;
036
037import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
038import io.opentelemetry.sdk.trace.data.SpanData;
039import java.io.IOException;
040import java.util.List;
041import java.util.Random;
042import java.util.concurrent.ScheduledThreadPoolExecutor;
043import java.util.concurrent.ThreadLocalRandom;
044import java.util.concurrent.TimeUnit;
045import java.util.function.BiConsumer;
046import java.util.function.BiFunction;
047import java.util.function.Consumer;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.KeyValue;
055import org.apache.hadoop.hbase.MatcherPredicate;
056import org.apache.hadoop.hbase.TableName;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.RegionInfoBuilder;
061import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ByteBuffAllocator;
064import org.apache.hadoop.hbase.io.HFileLink;
065import org.apache.hadoop.hbase.io.compress.Compression;
066import org.apache.hadoop.hbase.regionserver.BloomType;
067import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
068import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
069import org.apache.hadoop.hbase.regionserver.HStoreFile;
070import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier;
071import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
072import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
073import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
074import org.apache.hadoop.hbase.testclassification.IOTests;
075import org.apache.hadoop.hbase.testclassification.MediumTests;
076import org.apache.hadoop.hbase.trace.TraceUtil;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.CommonFSUtils;
079import org.apache.hadoop.hbase.util.Pair;
080import org.junit.Before;
081import org.junit.ClassRule;
082import org.junit.Rule;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088@Category({ IOTests.class, MediumTests.class })
089public class TestPrefetch {
090  private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class);
091
092  @ClassRule
093  public static final HBaseClassTestRule CLASS_RULE =
094    HBaseClassTestRule.forClass(TestPrefetch.class);
095
096  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
097
098  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
099  private static final int DATA_BLOCK_SIZE = 2048;
100  private static final int NUM_KV = 1000;
101  private Configuration conf;
102  private CacheConfig cacheConf;
103  private FileSystem fs;
104  private BlockCache blockCache;
105
106  @Rule
107  public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
108
109  @Before
110  public void setUp() throws IOException {
111    conf = TEST_UTIL.getConfiguration();
112    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
113    fs = HFileSystem.get(conf);
114    blockCache = BlockCacheFactory.createBlockCache(conf);
115    cacheConf = new CacheConfig(conf, blockCache);
116  }
117
118  @Test
119  public void testPrefetchSetInHCDWorks() {
120    ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
121      .newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build();
122    Configuration c = HBaseConfiguration.create();
123    assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false));
124    CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
125    assertTrue(cc.shouldPrefetchOnOpen());
126  }
127
128  @Test
129  public void testPrefetchBlockCacheDisabled() throws Exception {
130    ScheduledThreadPoolExecutor poolExecutor =
131      (ScheduledThreadPoolExecutor) PrefetchExecutor.getExecutorPool();
132    long totalCompletedBefore = poolExecutor.getCompletedTaskCount();
133    long queueBefore = poolExecutor.getQueue().size();
134    ColumnFamilyDescriptor columnFamilyDescriptor =
135      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
136        .setBlockCacheEnabled(false).build();
137    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
138    CacheConfig cacheConfig =
139      new CacheConfig(conf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
140    Path storeFile = writeStoreFile("testPrefetchBlockCacheDisabled", meta, cacheConfig);
141    readStoreFile(storeFile, (r, o) -> {
142      HFileBlock block = null;
143      try {
144        block = r.readBlock(o, -1, false, true, false, true, null, null);
145      } catch (IOException e) {
146        fail(e.getMessage());
147      }
148      return block;
149    }, (key, block) -> {
150      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
151      if (
152        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
153          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
154      ) {
155        assertFalse(isCached);
156      }
157    }, cacheConfig);
158    assertEquals(totalCompletedBefore + queueBefore,
159      poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size());
160  }
161
162  @Test
163  public void testPrefetch() throws Exception {
164    TraceUtil.trace(() -> {
165      Path storeFile = writeStoreFile("TestPrefetch");
166      readStoreFile(storeFile);
167    }, "testPrefetch");
168
169    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans,
170      hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request"))));
171    final List<SpanData> spans = otelRule.getSpans();
172    if (LOG.isDebugEnabled()) {
173      StringTraceRenderer renderer = new StringTraceRenderer(spans);
174      renderer.render(LOG::debug);
175    }
176
177    final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst()
178      .orElseThrow(AssertionError::new);
179    assertThat("prefetch spans happen on their own threads, detached from file open.", spans,
180      hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan)))));
181  }
182
183  @Test
184  public void testPrefetchRace() throws Exception {
185    for (int i = 0; i < 10; i++) {
186      Path storeFile = writeStoreFile("TestPrefetchRace-" + i);
187      readStoreFileLikeScanner(storeFile);
188    }
189  }
190
191  /**
192   * Read a storefile in the same manner as a scanner -- using non-positional reads and without
193   * waiting for prefetch to complete.
194   */
195  private void readStoreFileLikeScanner(Path storeFilePath) throws Exception {
196    // Open the file
197    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
198    do {
199      long offset = 0;
200      while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
201        HFileBlock block =
202          reader.readBlock(offset, -1, false, /* pread= */false, false, true, null, null);
203        offset += block.getOnDiskSizeWithHeader();
204      }
205    } while (!reader.prefetchComplete());
206  }
207
208  private void readStoreFile(Path storeFilePath) throws Exception {
209    readStoreFile(storeFilePath, (r, o) -> {
210      HFileBlock block = null;
211      try {
212        block = r.readBlock(o, -1, false, true, false, true, null, null);
213      } catch (IOException e) {
214        fail(e.getMessage());
215      }
216      return block;
217    }, (key, block) -> {
218      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
219      if (
220        block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
221          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
222      ) {
223        assertTrue(isCached);
224      }
225    });
226  }
227
228  private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
229    readStoreFile(storeFilePath, (r, o) -> {
230      HFileBlock block = null;
231      try {
232        block = r.readBlock(o, -1, false, true, false, true, null, null, true);
233      } catch (IOException e) {
234        fail(e.getMessage());
235      }
236      return block;
237    }, (key, block) -> {
238      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
239      if (block.getBlockType() == BlockType.DATA) {
240        assertFalse(block.isUnpacked());
241      } else if (
242        block.getBlockType() == BlockType.ROOT_INDEX
243          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
244      ) {
245        assertTrue(block.isUnpacked());
246      }
247      assertTrue(isCached);
248    });
249  }
250
251  private void readStoreFile(Path storeFilePath,
252    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
253    BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
254    readStoreFile(storeFilePath, readFunction, validationFunction, cacheConf);
255  }
256
257  private void readStoreFile(Path storeFilePath,
258    BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
259    BiConsumer<BlockCacheKey, HFileBlock> validationFunction, CacheConfig cacheConfig)
260    throws Exception {
261    // Open the file
262    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
263
264    while (!reader.prefetchComplete()) {
265      // Sleep for a bit
266      Thread.sleep(1000);
267    }
268    long offset = 0;
269    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
270      HFileBlock block = readFunction.apply(reader, offset);
271      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
272      validationFunction.accept(blockCacheKey, block);
273      offset += block.getOnDiskSizeWithHeader();
274    }
275  }
276
277  @Test
278  public void testPrefetchCompressed() throws Exception {
279    conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
280    cacheConf = new CacheConfig(conf, blockCache);
281    HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
282      .withBlockSize(DATA_BLOCK_SIZE).build();
283    Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
284    readStoreFileCacheOnly(storeFile);
285    conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
286  }
287
288  @Test
289  public void testPrefetchSkipsRefs() throws Exception {
290    testPrefetchWhenRefs(true, c -> {
291      boolean isCached = c != null;
292      assertFalse(isCached);
293    });
294  }
295
296  @Test
297  public void testPrefetchDoesntSkipRefs() throws Exception {
298    testPrefetchWhenRefs(false, c -> {
299      boolean isCached = c != null;
300      assertTrue(isCached);
301    });
302  }
303
304  @Test
305  public void testOnConfigurationChange() {
306    PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
307    conf.setInt(PREFETCH_DELAY, 40000);
308    prefetchExecutorNotifier.onConfigurationChange(conf);
309    assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000);
310
311    // restore
312    conf.setInt(PREFETCH_DELAY, 30000);
313    prefetchExecutorNotifier.onConfigurationChange(conf);
314    assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000);
315
316    conf.setInt(PREFETCH_DELAY, 1000);
317    prefetchExecutorNotifier.onConfigurationChange(conf);
318  }
319
320  @Test
321  public void testPrefetchWithDelay() throws Exception {
322    // Configure custom delay
323    PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
324    conf.setInt(PREFETCH_DELAY, 25000);
325    conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f);
326    prefetchExecutorNotifier.onConfigurationChange(conf);
327
328    HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
329      .withBlockSize(DATA_BLOCK_SIZE).build();
330    Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
331    HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf);
332    long startTime = System.currentTimeMillis();
333
334    // Wait for 20 seconds, no thread should start prefetch
335    Thread.sleep(20000);
336    assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
337    while (!reader.prefetchStarted()) {
338      assertTrue("Prefetch delay has not been expired yet",
339        getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
340    }
341    if (reader.prefetchStarted()) {
342      // Added some delay as we have started the timer a bit late.
343      Thread.sleep(500);
344      assertTrue("Prefetch should start post configured delay",
345        getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay());
346    }
347    conf.setInt(PREFETCH_DELAY, 1000);
348    conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
349    prefetchExecutorNotifier.onConfigurationChange(conf);
350  }
351
352  @Test
353  public void testPrefetchDoesntSkipHFileLink() throws Exception {
354    testPrefetchWhenHFileLink(c -> {
355      boolean isCached = c != null;
356      assertTrue(isCached);
357    });
358  }
359
360  private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
361    throws Exception {
362    cacheConf = new CacheConfig(conf, blockCache);
363    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
364    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchSkipRefs");
365    RegionInfo region =
366      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchSkipRefs")).build();
367    Path regionDir = new Path(tableDir, region.getEncodedName());
368    Pair<Path, byte[]> fileWithSplitPoint =
369      writeStoreFileForSplit(new Path(regionDir, "cf"), context);
370    Path storeFile = fileWithSplitPoint.getFirst();
371    HRegionFileSystem regionFS =
372      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
373    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true);
374    Path ref = regionFS.splitStoreFile(region, "cf", file, fileWithSplitPoint.getSecond(), false,
375      new ConstantSizeRegionSplitPolicy());
376    conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionEnabled);
377    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true);
378    refHsf.initReader();
379    HFile.Reader reader = refHsf.getReader().getHFileReader();
380    while (!reader.prefetchComplete()) {
381      // Sleep for a bit
382      Thread.sleep(1000);
383    }
384    long offset = 0;
385    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
386      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
387      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
388      if (block.getBlockType() == BlockType.DATA) {
389        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
390      }
391      offset += block.getOnDiskSizeWithHeader();
392    }
393  }
394
395  private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
396    cacheConf = new CacheConfig(conf, blockCache);
397    HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
398    Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
399    final RegionInfo hri =
400      RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
401    // force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
402    Configuration testConf = new Configuration(this.conf);
403    CommonFSUtils.setRootDir(testConf, testDir);
404    HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
405      CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);
406
407    // Make a store file and write data to it.
408    StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
409      .withFilePath(regionFs.createTempName()).withFileContext(context).build();
410    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
411      Bytes.toBytes("testPrefetchWhenHFileLink"));
412
413    Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
414    Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
415    HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
416    Path linkFilePath =
417      new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
418
419    // Try to open store file from link
420    StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
421    HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
422    assertTrue(storeFileInfo.isLink());
423
424    hsf.initReader();
425    HFile.Reader reader = hsf.getReader().getHFileReader();
426    while (!reader.prefetchComplete()) {
427      // Sleep for a bit
428      Thread.sleep(1000);
429    }
430    long offset = 0;
431    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
432      HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
433      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
434      if (block.getBlockType() == BlockType.DATA) {
435        test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
436      }
437      offset += block.getOnDiskSizeWithHeader();
438    }
439  }
440
441  private Path writeStoreFile(String fname) throws IOException {
442    HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
443    return writeStoreFile(fname, meta);
444  }
445
446  private Path writeStoreFile(String fname, HFileContext context) throws IOException {
447    return writeStoreFile(fname, context, cacheConf);
448  }
449
450  private Path writeStoreFile(String fname, HFileContext context, CacheConfig cacheConfig)
451    throws IOException {
452    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
453    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConfig, fs)
454      .withOutputDir(storeFileParentDir).withFileContext(context).build();
455    Random rand = ThreadLocalRandom.current();
456    final int rowLen = 32;
457    for (int i = 0; i < NUM_KV; ++i) {
458      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
459      byte[] v = RandomKeyValueUtil.randomValue(rand);
460      int cfLen = rand.nextInt(k.length - rowLen + 1);
461      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
462        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
463      sfw.append(kv);
464    }
465
466    sfw.close();
467    return sfw.getPath();
468  }
469
470  private Pair<Path, byte[]> writeStoreFileForSplit(Path storeDir, HFileContext context)
471    throws IOException {
472    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeDir)
473      .withFileContext(context).build();
474    Random rand = ThreadLocalRandom.current();
475    final int rowLen = 32;
476    byte[] splitPoint = null;
477    for (int i = 0; i < NUM_KV; ++i) {
478      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
479      byte[] v = RandomKeyValueUtil.randomValue(rand);
480      int cfLen = rand.nextInt(k.length - rowLen + 1);
481      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen,
482        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 0, v.length);
483      sfw.append(kv);
484      if (i == NUM_KV / 2) {
485        splitPoint = k;
486      }
487    }
488    sfw.close();
489    return new Pair(sfw.getPath(), splitPoint);
490  }
491
492  public static KeyValue.Type generateKeyType(Random rand) {
493    if (rand.nextBoolean()) {
494      // Let's make half of KVs puts.
495      return KeyValue.Type.Put;
496    } else {
497      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
498      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
499        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
500          + "Probably the layout of KeyValue.Type has changed.");
501      }
502      return keyType;
503    }
504  }
505
506  private long getElapsedTime(long startTime) {
507    return System.currentTimeMillis() - startTime;
508  }
509}