001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.EnumMap;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034import java.util.Set;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.ArrayBackedTag;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.Tag;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Durability;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.fs.HFileSystem;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
053import org.apache.hadoop.hbase.regionserver.BloomType;
054import org.apache.hadoop.hbase.regionserver.HRegion;
055import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
056import org.apache.hadoop.hbase.testclassification.IOTests;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.util.BloomFilterFactory;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.ChecksumType;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.Pair;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.runner.RunWith;
070import org.junit.runners.Parameterized;
071import org.junit.runners.Parameterized.Parameters;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
076import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
077
078/**
079 * Tests {@link HFile} cache-on-write functionality for the following block types: data blocks,
080 * non-root index blocks, and Bloom filter blocks.
081 */
082@RunWith(Parameterized.class)
083@Category({ IOTests.class, LargeTests.class })
084public class TestCacheOnWrite {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088    HBaseClassTestRule.forClass(TestCacheOnWrite.class);
089
090  private static final Logger LOG = LoggerFactory.getLogger(TestCacheOnWrite.class);
091
092  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
093  private Configuration conf;
094  private CacheConfig cacheConf;
095  private FileSystem fs;
096  private Random rand = new Random(12983177L);
097  private Path storeFilePath;
098  private BlockCache blockCache;
099  private String testDescription;
100
101  private final CacheOnWriteType cowType;
102  private final Compression.Algorithm compress;
103  private final boolean cacheCompressedData;
104
105  private static final int DATA_BLOCK_SIZE = 2048;
106  private static final int NUM_KV = 25000;
107  private static final int INDEX_BLOCK_SIZE = 512;
108  private static final int BLOOM_BLOCK_SIZE = 4096;
109  private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
110  private static final int CKBYTES = 512;
111
112  private static final Set<BlockType> INDEX_BLOCK_TYPES = ImmutableSet.of(BlockType.INDEX_V1,
113    BlockType.INTERMEDIATE_INDEX, BlockType.ROOT_INDEX, BlockType.LEAF_INDEX);
114  private static final Set<BlockType> BLOOM_BLOCK_TYPES = ImmutableSet.of(BlockType.BLOOM_CHUNK,
115    BlockType.GENERAL_BLOOM_META, BlockType.DELETE_FAMILY_BLOOM_META);
116  private static final Set<BlockType> DATA_BLOCK_TYPES =
117    ImmutableSet.of(BlockType.ENCODED_DATA, BlockType.DATA);
118
119  // All test cases are supposed to generate files for compaction within this range
120  private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L;
121  private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L;
122
123  /** The number of valid key types possible in a store file */
124  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
125
126  private enum CacheOnWriteType {
127    DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, BlockType.DATA, BlockType.ENCODED_DATA),
128    BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, BlockType.BLOOM_CHUNK),
129    INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, BlockType.LEAF_INDEX,
130      BlockType.INTERMEDIATE_INDEX);
131
132    private final String confKey;
133    private final BlockType blockType1;
134    private final BlockType blockType2;
135
136    CacheOnWriteType(String confKey, BlockType blockType) {
137      this(confKey, blockType, blockType);
138    }
139
140    CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2) {
141      this.blockType1 = blockType1;
142      this.blockType2 = blockType2;
143      this.confKey = confKey;
144    }
145
146    public boolean shouldBeCached(BlockType blockType) {
147      return blockType == blockType1 || blockType == blockType2;
148    }
149
150    public void modifyConf(Configuration conf) {
151      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
152        conf.setBoolean(cowType.confKey, cowType == this);
153      }
154    }
155  }
156
157  public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
158    boolean cacheCompressedData, BlockCache blockCache) {
159    this.cowType = cowType;
160    this.compress = compress;
161    this.cacheCompressedData = cacheCompressedData;
162    this.blockCache = blockCache;
163    testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress
164      + ", cacheCompressedData=" + cacheCompressedData + "]";
165    LOG.info(testDescription);
166  }
167
168  private static List<BlockCache> getBlockCaches() throws IOException {
169    Configuration conf = TEST_UTIL.getConfiguration();
170    List<BlockCache> blockcaches = new ArrayList<>();
171    // default
172    blockcaches.add(BlockCacheFactory.createBlockCache(conf));
173
174    // set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287
175    TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
176      2.0f);
177    // memory
178    BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration());
179    blockcaches.add(lru);
180
181    // bucket cache
182    FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir());
183    int[] bucketSizes =
184      { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 };
185    BlockCache bucketcache =
186      new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null);
187    blockcaches.add(bucketcache);
188    return blockcaches;
189  }
190
191  @Parameters
192  public static Collection<Object[]> getParameters() throws IOException {
193    List<Object[]> params = new ArrayList<>();
194    for (BlockCache blockCache : getBlockCaches()) {
195      for (CacheOnWriteType cowType : CacheOnWriteType.values()) {
196        for (Compression.Algorithm compress : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
197          for (boolean cacheCompressedData : new boolean[] { false, true }) {
198            params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache });
199          }
200        }
201      }
202    }
203    return params;
204  }
205
206  private void clearBlockCache(BlockCache blockCache) throws InterruptedException {
207    if (blockCache instanceof LruBlockCache) {
208      ((LruBlockCache) blockCache).clearCache();
209    } else {
210      // BucketCache may not return all cached blocks(blocks in write queue), so check it here.
211      for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) {
212        if (clearCount > 0) {
213          LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, "
214            + blockCache.getBlockCount() + " blocks remaining");
215          Thread.sleep(10);
216        }
217        for (CachedBlock block : Lists.newArrayList(blockCache)) {
218          BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset());
219          // CombinedBucketCache may need evict two times.
220          for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) {
221            if (evictCount > 1) {
222              LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount
223                + " times, maybe a bug here");
224            }
225          }
226        }
227      }
228    }
229  }
230
231  @Before
232  public void setUp() throws IOException {
233    conf = TEST_UTIL.getConfiguration();
234    this.conf.set("dfs.datanode.data.dir.perm", "700");
235    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
236    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE);
237    conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
238    cowType.modifyConf(conf);
239    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA));
240    conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
241      cowType.shouldBeCached(BlockType.LEAF_INDEX));
242    conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
243      cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
244    cacheConf = new CacheConfig(conf, blockCache);
245    fs = HFileSystem.get(conf);
246  }
247
248  @After
249  public void tearDown() throws IOException, InterruptedException {
250    clearBlockCache(blockCache);
251  }
252
253  @AfterClass
254  public static void afterClass() throws IOException {
255    TEST_UTIL.cleanupTestDir();
256  }
257
258  private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException {
259    writeStoreFile(useTags);
260    readStoreFile(useTags);
261  }
262
263  private void readStoreFile(boolean useTags) throws IOException {
264    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
265    LOG.info("HFile information: " + reader);
266    HFileContext meta =
267      new HFileContextBuilder().withCompression(compress).withBytesPerCheckSum(CKBYTES)
268        .withChecksumType(ChecksumType.NULL).withBlockSize(DATA_BLOCK_SIZE)
269        .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
270        .withIncludesTags(useTags).build();
271    final boolean cacheBlocks = false;
272    final boolean pread = false;
273    HFileScanner scanner = reader.getScanner(conf, cacheBlocks, pread);
274    assertTrue(testDescription, scanner.seekTo());
275
276    long offset = 0;
277    EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class);
278
279    DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding();
280    List<Long> cachedBlocksOffset = new ArrayList<>();
281    Map<Long, Pair<HFileBlock, HFileBlock>> cachedBlocks = new HashMap<>();
282    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
283      // Flags: don't cache the block, use pread, this is not a compaction.
284      // Also, pass null for expected block type to avoid checking it.
285      HFileBlock block =
286        reader.readBlock(offset, -1, false, true, false, true, null, encodingInCache);
287      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
288      HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
289      boolean isCached = fromCache != null;
290      cachedBlocksOffset.add(offset);
291      cachedBlocks.put(offset, fromCache == null ? null : Pair.newPair(block, fromCache));
292      boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
293      assertTrue(
294        "shouldBeCached: " + shouldBeCached + "\n" + "isCached: " + isCached + "\n"
295          + "Test description: " + testDescription + "\n" + "block: " + block + "\n"
296          + "encodingInCache: " + encodingInCache + "\n" + "blockCacheKey: " + blockCacheKey,
297        shouldBeCached == isCached);
298      if (isCached) {
299        if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
300          if (compress != Compression.Algorithm.NONE) {
301            assertFalse(fromCache.isUnpacked());
302          }
303          fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
304        } else {
305          assertTrue(fromCache.isUnpacked());
306        }
307        // block we cached at write-time and block read from file should be identical
308        assertEquals(block.getChecksumType(), fromCache.getChecksumType());
309        assertEquals(block.getBlockType(), fromCache.getBlockType());
310        assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
311        assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
312        assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
313        assertEquals(block.getUncompressedSizeWithoutHeader(),
314          fromCache.getUncompressedSizeWithoutHeader());
315      }
316      offset += block.getOnDiskSizeWithHeader();
317      BlockType bt = block.getBlockType();
318      Integer count = blockCountByType.get(bt);
319      blockCountByType.put(bt, (count == null ? 0 : count) + 1);
320    }
321
322    LOG.info("Block count by type: " + blockCountByType);
323    String countByType = blockCountByType.toString();
324    if (useTags) {
325      assertEquals(
326        "{" + BlockType.DATA + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}",
327        countByType);
328    } else {
329      assertEquals(
330        "{" + BlockType.DATA + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}",
331        countByType);
332    }
333
334    // iterate all the keyvalue from hfile
335    while (scanner.next()) {
336      scanner.getCell();
337    }
338    Iterator<Long> iterator = cachedBlocksOffset.iterator();
339    while (iterator.hasNext()) {
340      Long entry = iterator.next();
341      BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), entry);
342      Pair<HFileBlock, HFileBlock> blockPair = cachedBlocks.get(entry);
343      if (blockPair != null) {
344        // Call return twice because for the isCache cased the counter would have got incremented
345        // twice. Notice that here we need to returnBlock with different blocks. see comments in
346        // BucketCache#returnBlock.
347        blockPair.getSecond().release();
348        if (cacheCompressedData) {
349          if (
350            this.compress == Compression.Algorithm.NONE || cowType == CacheOnWriteType.INDEX_BLOCKS
351              || cowType == CacheOnWriteType.BLOOM_BLOCKS
352          ) {
353            blockPair.getFirst().release();
354          }
355        } else {
356          blockPair.getFirst().release();
357        }
358      }
359    }
360    scanner.shipped();
361    reader.close();
362  }
363
364  public static KeyValue.Type generateKeyType(Random rand) {
365    if (rand.nextBoolean()) {
366      // Let's make half of KVs puts.
367      return KeyValue.Type.Put;
368    } else {
369      KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)];
370      if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) {
371        throw new RuntimeException("Generated an invalid key type: " + keyType + ". "
372          + "Probably the layout of KeyValue.Type has changed.");
373      }
374      return keyType;
375    }
376  }
377
378  private void writeStoreFile(boolean useTags) throws IOException {
379    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "test_cache_on_write");
380    HFileContext meta =
381      new HFileContextBuilder().withCompression(compress).withBytesPerCheckSum(CKBYTES)
382        .withChecksumType(ChecksumType.NULL).withBlockSize(DATA_BLOCK_SIZE)
383        .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
384        .withIncludesTags(useTags).build();
385    StoreFileWriter sfw =
386      new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeFileParentDir)
387        .withFileContext(meta).withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build();
388    byte[] cf = Bytes.toBytes("fam");
389    for (int i = 0; i < NUM_KV; ++i) {
390      byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i);
391      byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand);
392      byte[] value = RandomKeyValueUtil.randomValue(rand);
393      KeyValue kv;
394      if (useTags) {
395        Tag t = new ArrayBackedTag((byte) 1, "visibility");
396        List<Tag> tagList = new ArrayList<>();
397        tagList.add(t);
398        Tag[] tags = new Tag[1];
399        tags[0] = t;
400        kv = new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
401          Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList);
402      } else {
403        kv = new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length,
404          Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length);
405      }
406      sfw.append(kv);
407    }
408
409    sfw.close();
410    storeFilePath = sfw.getPath();
411  }
412
413  private void testCachingDataBlocksDuringCompactionInternals(boolean useTags,
414    boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold)
415    throws IOException, InterruptedException {
416    // create a localConf
417    boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false);
418    long localCacheCompactedBlocksThreshold =
419      conf.getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
420        CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD);
421    boolean localCacheBloomBlocksValue = conf.getBoolean(
422      CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE);
423    boolean localCacheIndexBlocksValue = conf.getBoolean(
424      CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE);
425
426    try {
427      // Set the conf if testing caching compacted blocks on write
428      conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, cacheBlocksOnCompaction);
429
430      // set size threshold if testing compaction size threshold
431      if (cacheBlocksOnCompactionThreshold > 0) {
432        conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
433          cacheBlocksOnCompactionThreshold);
434      }
435
436      // TODO: need to change this test if we add a cache size threshold for
437      // compactions, or if we implement some other kind of intelligent logic for
438      // deciding what blocks to cache-on-write on compaction.
439      final String table = "CompactionCacheOnWrite";
440      final String cf = "myCF";
441      final byte[] cfBytes = Bytes.toBytes(cf);
442      final int maxVersions = 3;
443      ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cfBytes)
444        .setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
445        .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
446      HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
447      int rowIdx = 0;
448      long ts = EnvironmentEdgeManager.currentTime();
449      for (int iFile = 0; iFile < 5; ++iFile) {
450        for (int iRow = 0; iRow < 500; ++iRow) {
451          String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow;
452          Put p = new Put(Bytes.toBytes(rowStr));
453          ++rowIdx;
454          for (int iCol = 0; iCol < 10; ++iCol) {
455            String qualStr = "col" + iCol;
456            String valueStr = "value_" + rowStr + "_" + qualStr;
457            for (int iTS = 0; iTS < 5; ++iTS) {
458              if (useTags) {
459                Tag t = new ArrayBackedTag((byte) 1, "visibility");
460                Tag[] tags = new Tag[1];
461                tags[0] = t;
462                KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
463                  HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
464                p.add(kv);
465              } else {
466                KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
467                  ts++, Bytes.toBytes(valueStr));
468                p.add(kv);
469              }
470            }
471          }
472          p.setDurability(Durability.ASYNC_WAL);
473          region.put(p);
474        }
475        region.flush(true);
476      }
477
478      clearBlockCache(blockCache);
479      assertEquals(0, blockCache.getBlockCount());
480
481      region.compact(false);
482      LOG.debug("compactStores() returned");
483
484      boolean dataBlockCached = false;
485      boolean bloomBlockCached = false;
486      boolean indexBlockCached = false;
487
488      for (CachedBlock block : blockCache) {
489        if (DATA_BLOCK_TYPES.contains(block.getBlockType())) {
490          dataBlockCached = true;
491        } else if (BLOOM_BLOCK_TYPES.contains(block.getBlockType())) {
492          bloomBlockCached = true;
493        } else if (INDEX_BLOCK_TYPES.contains(block.getBlockType())) {
494          indexBlockCached = true;
495        }
496      }
497
498      // Data blocks should be cached in instances where we are caching blocks on write. In the case
499      // of testing
500      // BucketCache, we cannot verify block type as it is not stored in the cache.
501      boolean cacheOnCompactAndNonBucketCache =
502        cacheBlocksOnCompaction && !(blockCache instanceof BucketCache);
503
504      String assertErrorMessage = "\nTest description: " + testDescription
505        + "\ncacheBlocksOnCompaction: " + cacheBlocksOnCompaction + "\n";
506
507      if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) {
508        if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) {
509          assertTrue(assertErrorMessage, dataBlockCached);
510          assertTrue(assertErrorMessage, bloomBlockCached);
511          assertTrue(assertErrorMessage, indexBlockCached);
512        } else {
513          assertFalse(assertErrorMessage, dataBlockCached);
514
515          if (localCacheBloomBlocksValue) {
516            assertTrue(assertErrorMessage, bloomBlockCached);
517          } else {
518            assertFalse(assertErrorMessage, bloomBlockCached);
519          }
520
521          if (localCacheIndexBlocksValue) {
522            assertTrue(assertErrorMessage, indexBlockCached);
523          } else {
524            assertFalse(assertErrorMessage, indexBlockCached);
525          }
526        }
527      } else {
528        assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached);
529
530        if (cacheOnCompactAndNonBucketCache) {
531          assertTrue(assertErrorMessage, bloomBlockCached);
532          assertTrue(assertErrorMessage, indexBlockCached);
533        }
534      }
535
536      region.close();
537    } finally {
538      // reset back
539      conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue);
540      conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
541        localCacheCompactedBlocksThreshold);
542      conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue);
543      conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue);
544    }
545  }
546
547  @Test
548  public void testStoreFileCacheOnWrite() throws IOException {
549    testStoreFileCacheOnWriteInternals(false);
550    testStoreFileCacheOnWriteInternals(true);
551  }
552
553  @Test
554  public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
555    testCachingDataBlocksDuringCompactionInternals(false, false, -1);
556    testCachingDataBlocksDuringCompactionInternals(true, true, -1);
557  }
558
559  @Test
560  public void testCachingDataBlocksThresholdDuringCompaction()
561    throws IOException, InterruptedException {
562    testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD);
563    testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD);
564  }
565
566}