001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.EnumMap; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.Set; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.ArrayBackedTag; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.Tag; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 047import org.apache.hadoop.hbase.client.Durability; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.fs.HFileSystem; 050import org.apache.hadoop.hbase.io.compress.Compression; 051import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 052import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 053import org.apache.hadoop.hbase.regionserver.BloomType; 054import org.apache.hadoop.hbase.regionserver.HRegion; 055import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 056import org.apache.hadoop.hbase.testclassification.IOTests; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.util.BloomFilterFactory; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.ChecksumType; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.Pair; 063import org.junit.After; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.ClassRule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.junit.runners.Parameterized.Parameters; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 076import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 077 078/** 079 * Tests {@link HFile} cache-on-write functionality for the following block types: data blocks, 080 * non-root index blocks, and Bloom filter blocks. 081 */ 082@RunWith(Parameterized.class) 083@Category({ IOTests.class, LargeTests.class }) 084public class TestCacheOnWrite { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestCacheOnWrite.class); 089 090 private static final Logger LOG = LoggerFactory.getLogger(TestCacheOnWrite.class); 091 092 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 093 private Configuration conf; 094 private CacheConfig cacheConf; 095 private FileSystem fs; 096 private Random rand = new Random(12983177L); 097 private Path storeFilePath; 098 private BlockCache blockCache; 099 private String testDescription; 100 101 private final CacheOnWriteType cowType; 102 private final Compression.Algorithm compress; 103 private final boolean cacheCompressedData; 104 105 private static final int DATA_BLOCK_SIZE = 2048; 106 private static final int NUM_KV = 25000; 107 private static final int INDEX_BLOCK_SIZE = 512; 108 private static final int BLOOM_BLOCK_SIZE = 4096; 109 private static final BloomType BLOOM_TYPE = BloomType.ROWCOL; 110 private static final int CKBYTES = 512; 111 112 private static final Set<BlockType> INDEX_BLOCK_TYPES = ImmutableSet.of(BlockType.INDEX_V1, 113 BlockType.INTERMEDIATE_INDEX, BlockType.ROOT_INDEX, BlockType.LEAF_INDEX); 114 private static final Set<BlockType> BLOOM_BLOCK_TYPES = ImmutableSet.of(BlockType.BLOOM_CHUNK, 115 BlockType.GENERAL_BLOOM_META, BlockType.DELETE_FAMILY_BLOOM_META); 116 private static final Set<BlockType> DATA_BLOCK_TYPES = 117 ImmutableSet.of(BlockType.ENCODED_DATA, BlockType.DATA); 118 119 // All test cases are supposed to generate files for compaction within this range 120 private static final long CACHE_COMPACTION_LOW_THRESHOLD = 10L; 121 private static final long CACHE_COMPACTION_HIGH_THRESHOLD = 1 * 1024 * 1024 * 1024L; 122 123 /** The number of valid key types possible in a store file */ 124 private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; 125 126 private enum CacheOnWriteType { 127 DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, BlockType.DATA, BlockType.ENCODED_DATA), 128 BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, BlockType.BLOOM_CHUNK), 129 INDEX_BLOCKS(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, BlockType.LEAF_INDEX, 130 BlockType.INTERMEDIATE_INDEX); 131 132 private final String confKey; 133 private final BlockType blockType1; 134 private final BlockType blockType2; 135 136 CacheOnWriteType(String confKey, BlockType blockType) { 137 this(confKey, blockType, blockType); 138 } 139 140 CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2) { 141 this.blockType1 = blockType1; 142 this.blockType2 = blockType2; 143 this.confKey = confKey; 144 } 145 146 public boolean shouldBeCached(BlockType blockType) { 147 return blockType == blockType1 || blockType == blockType2; 148 } 149 150 public void modifyConf(Configuration conf) { 151 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 152 conf.setBoolean(cowType.confKey, cowType == this); 153 } 154 } 155 } 156 157 public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, 158 boolean cacheCompressedData, BlockCache blockCache) { 159 this.cowType = cowType; 160 this.compress = compress; 161 this.cacheCompressedData = cacheCompressedData; 162 this.blockCache = blockCache; 163 testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress 164 + ", cacheCompressedData=" + cacheCompressedData + "]"; 165 LOG.info(testDescription); 166 } 167 168 private static List<BlockCache> getBlockCaches() throws IOException { 169 Configuration conf = TEST_UTIL.getConfiguration(); 170 List<BlockCache> blockcaches = new ArrayList<>(); 171 // default 172 blockcaches.add(BlockCacheFactory.createBlockCache(conf)); 173 174 // set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287 175 TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 176 2.0f); 177 // memory 178 BlockCache lru = new LruBlockCache(128 * 1024 * 1024, 64 * 1024, TEST_UTIL.getConfiguration()); 179 blockcaches.add(lru); 180 181 // bucket cache 182 FileSystem.get(conf).mkdirs(TEST_UTIL.getDataTestDir()); 183 int[] bucketSizes = 184 { INDEX_BLOCK_SIZE, DATA_BLOCK_SIZE, BLOOM_BLOCK_SIZE, 64 * 1024, 128 * 1024 }; 185 BlockCache bucketcache = 186 new BucketCache("offheap", 128 * 1024 * 1024, 64 * 1024, bucketSizes, 5, 64 * 100, null); 187 blockcaches.add(bucketcache); 188 return blockcaches; 189 } 190 191 @Parameters 192 public static Collection<Object[]> getParameters() throws IOException { 193 List<Object[]> params = new ArrayList<>(); 194 for (BlockCache blockCache : getBlockCaches()) { 195 for (CacheOnWriteType cowType : CacheOnWriteType.values()) { 196 for (Compression.Algorithm compress : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 197 for (boolean cacheCompressedData : new boolean[] { false, true }) { 198 params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache }); 199 } 200 } 201 } 202 } 203 return params; 204 } 205 206 private void clearBlockCache(BlockCache blockCache) throws InterruptedException { 207 if (blockCache instanceof LruBlockCache) { 208 ((LruBlockCache) blockCache).clearCache(); 209 } else { 210 // BucketCache may not return all cached blocks(blocks in write queue), so check it here. 211 for (int clearCount = 0; blockCache.getBlockCount() > 0; clearCount++) { 212 if (clearCount > 0) { 213 LOG.warn("clear block cache " + blockCache + " " + clearCount + " times, " 214 + blockCache.getBlockCount() + " blocks remaining"); 215 Thread.sleep(10); 216 } 217 for (CachedBlock block : Lists.newArrayList(blockCache)) { 218 BlockCacheKey key = new BlockCacheKey(block.getFilename(), block.getOffset()); 219 // CombinedBucketCache may need evict two times. 220 for (int evictCount = 0; blockCache.evictBlock(key); evictCount++) { 221 if (evictCount > 1) { 222 LOG.warn("evict block " + block + " in " + blockCache + " " + evictCount 223 + " times, maybe a bug here"); 224 } 225 } 226 } 227 } 228 } 229 } 230 231 @Before 232 public void setUp() throws IOException { 233 conf = TEST_UTIL.getConfiguration(); 234 this.conf.set("dfs.datanode.data.dir.perm", "700"); 235 conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); 236 conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); 237 conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); 238 cowType.modifyConf(conf); 239 conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA)); 240 conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, 241 cowType.shouldBeCached(BlockType.LEAF_INDEX)); 242 conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, 243 cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); 244 cacheConf = new CacheConfig(conf, blockCache); 245 fs = HFileSystem.get(conf); 246 } 247 248 @After 249 public void tearDown() throws IOException, InterruptedException { 250 clearBlockCache(blockCache); 251 } 252 253 @AfterClass 254 public static void afterClass() throws IOException { 255 TEST_UTIL.cleanupTestDir(); 256 } 257 258 private void testStoreFileCacheOnWriteInternals(boolean useTags) throws IOException { 259 writeStoreFile(useTags); 260 readStoreFile(useTags); 261 } 262 263 private void readStoreFile(boolean useTags) throws IOException { 264 HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); 265 LOG.info("HFile information: " + reader); 266 HFileContext meta = 267 new HFileContextBuilder().withCompression(compress).withBytesPerCheckSum(CKBYTES) 268 .withChecksumType(ChecksumType.NULL).withBlockSize(DATA_BLOCK_SIZE) 269 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 270 .withIncludesTags(useTags).build(); 271 final boolean cacheBlocks = false; 272 final boolean pread = false; 273 HFileScanner scanner = reader.getScanner(conf, cacheBlocks, pread); 274 assertTrue(testDescription, scanner.seekTo()); 275 276 long offset = 0; 277 EnumMap<BlockType, Integer> blockCountByType = new EnumMap<>(BlockType.class); 278 279 DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding(); 280 List<Long> cachedBlocksOffset = new ArrayList<>(); 281 Map<Long, Pair<HFileBlock, HFileBlock>> cachedBlocks = new HashMap<>(); 282 while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { 283 // Flags: don't cache the block, use pread, this is not a compaction. 284 // Also, pass null for expected block type to avoid checking it. 285 HFileBlock block = 286 reader.readBlock(offset, -1, false, true, false, true, null, encodingInCache); 287 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); 288 HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); 289 boolean isCached = fromCache != null; 290 cachedBlocksOffset.add(offset); 291 cachedBlocks.put(offset, fromCache == null ? null : Pair.newPair(block, fromCache)); 292 boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); 293 assertTrue( 294 "shouldBeCached: " + shouldBeCached + "\n" + "isCached: " + isCached + "\n" 295 + "Test description: " + testDescription + "\n" + "block: " + block + "\n" 296 + "encodingInCache: " + encodingInCache + "\n" + "blockCacheKey: " + blockCacheKey, 297 shouldBeCached == isCached); 298 if (isCached) { 299 if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { 300 if (compress != Compression.Algorithm.NONE) { 301 assertFalse(fromCache.isUnpacked()); 302 } 303 fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); 304 } else { 305 assertTrue(fromCache.isUnpacked()); 306 } 307 // block we cached at write-time and block read from file should be identical 308 assertEquals(block.getChecksumType(), fromCache.getChecksumType()); 309 assertEquals(block.getBlockType(), fromCache.getBlockType()); 310 assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); 311 assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); 312 assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); 313 assertEquals(block.getUncompressedSizeWithoutHeader(), 314 fromCache.getUncompressedSizeWithoutHeader()); 315 } 316 offset += block.getOnDiskSizeWithHeader(); 317 BlockType bt = block.getBlockType(); 318 Integer count = blockCountByType.get(bt); 319 blockCountByType.put(bt, (count == null ? 0 : count) + 1); 320 } 321 322 LOG.info("Block count by type: " + blockCountByType); 323 String countByType = blockCountByType.toString(); 324 if (useTags) { 325 assertEquals( 326 "{" + BlockType.DATA + "=2663, LEAF_INDEX=297, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=32}", 327 countByType); 328 } else { 329 assertEquals( 330 "{" + BlockType.DATA + "=2498, LEAF_INDEX=278, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=31}", 331 countByType); 332 } 333 334 // iterate all the keyvalue from hfile 335 while (scanner.next()) { 336 scanner.getCell(); 337 } 338 Iterator<Long> iterator = cachedBlocksOffset.iterator(); 339 while (iterator.hasNext()) { 340 Long entry = iterator.next(); 341 BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), entry); 342 Pair<HFileBlock, HFileBlock> blockPair = cachedBlocks.get(entry); 343 if (blockPair != null) { 344 // Call return twice because for the isCache cased the counter would have got incremented 345 // twice. Notice that here we need to returnBlock with different blocks. see comments in 346 // BucketCache#returnBlock. 347 blockPair.getSecond().release(); 348 if (cacheCompressedData) { 349 if ( 350 this.compress == Compression.Algorithm.NONE || cowType == CacheOnWriteType.INDEX_BLOCKS 351 || cowType == CacheOnWriteType.BLOOM_BLOCKS 352 ) { 353 blockPair.getFirst().release(); 354 } 355 } else { 356 blockPair.getFirst().release(); 357 } 358 } 359 } 360 scanner.shipped(); 361 reader.close(); 362 } 363 364 public static KeyValue.Type generateKeyType(Random rand) { 365 if (rand.nextBoolean()) { 366 // Let's make half of KVs puts. 367 return KeyValue.Type.Put; 368 } else { 369 KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; 370 if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { 371 throw new RuntimeException("Generated an invalid key type: " + keyType + ". " 372 + "Probably the layout of KeyValue.Type has changed."); 373 } 374 return keyType; 375 } 376 } 377 378 private void writeStoreFile(boolean useTags) throws IOException { 379 Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "test_cache_on_write"); 380 HFileContext meta = 381 new HFileContextBuilder().withCompression(compress).withBytesPerCheckSum(CKBYTES) 382 .withChecksumType(ChecksumType.NULL).withBlockSize(DATA_BLOCK_SIZE) 383 .withDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) 384 .withIncludesTags(useTags).build(); 385 StoreFileWriter sfw = 386 new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(storeFileParentDir) 387 .withFileContext(meta).withBloomType(BLOOM_TYPE).withMaxKeyCount(NUM_KV).build(); 388 byte[] cf = Bytes.toBytes("fam"); 389 for (int i = 0; i < NUM_KV; ++i) { 390 byte[] row = RandomKeyValueUtil.randomOrderedKey(rand, i); 391 byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); 392 byte[] value = RandomKeyValueUtil.randomValue(rand); 393 KeyValue kv; 394 if (useTags) { 395 Tag t = new ArrayBackedTag((byte) 1, "visibility"); 396 List<Tag> tagList = new ArrayList<>(); 397 tagList.add(t); 398 Tag[] tags = new Tag[1]; 399 tags[0] = t; 400 kv = new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 401 Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length, tagList); 402 } else { 403 kv = new KeyValue(row, 0, row.length, cf, 0, cf.length, qualifier, 0, qualifier.length, 404 Math.abs(rand.nextLong()), generateKeyType(rand), value, 0, value.length); 405 } 406 sfw.append(kv); 407 } 408 409 sfw.close(); 410 storeFilePath = sfw.getPath(); 411 } 412 413 private void testCachingDataBlocksDuringCompactionInternals(boolean useTags, 414 boolean cacheBlocksOnCompaction, long cacheBlocksOnCompactionThreshold) 415 throws IOException, InterruptedException { 416 // create a localConf 417 boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, false); 418 long localCacheCompactedBlocksThreshold = 419 conf.getLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 420 CacheConfig.DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD); 421 boolean localCacheBloomBlocksValue = conf.getBoolean( 422 CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, CacheConfig.DEFAULT_CACHE_BLOOMS_ON_WRITE); 423 boolean localCacheIndexBlocksValue = conf.getBoolean( 424 CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, CacheConfig.DEFAULT_CACHE_INDEXES_ON_WRITE); 425 426 try { 427 // Set the conf if testing caching compacted blocks on write 428 conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, cacheBlocksOnCompaction); 429 430 // set size threshold if testing compaction size threshold 431 if (cacheBlocksOnCompactionThreshold > 0) { 432 conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 433 cacheBlocksOnCompactionThreshold); 434 } 435 436 // TODO: need to change this test if we add a cache size threshold for 437 // compactions, or if we implement some other kind of intelligent logic for 438 // deciding what blocks to cache-on-write on compaction. 439 final String table = "CompactionCacheOnWrite"; 440 final String cf = "myCF"; 441 final byte[] cfBytes = Bytes.toBytes(cf); 442 final int maxVersions = 3; 443 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cfBytes) 444 .setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions) 445 .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build(); 446 HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache); 447 int rowIdx = 0; 448 long ts = EnvironmentEdgeManager.currentTime(); 449 for (int iFile = 0; iFile < 5; ++iFile) { 450 for (int iRow = 0; iRow < 500; ++iRow) { 451 String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow; 452 Put p = new Put(Bytes.toBytes(rowStr)); 453 ++rowIdx; 454 for (int iCol = 0; iCol < 10; ++iCol) { 455 String qualStr = "col" + iCol; 456 String valueStr = "value_" + rowStr + "_" + qualStr; 457 for (int iTS = 0; iTS < 5; ++iTS) { 458 if (useTags) { 459 Tag t = new ArrayBackedTag((byte) 1, "visibility"); 460 Tag[] tags = new Tag[1]; 461 tags[0] = t; 462 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), 463 HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); 464 p.add(kv); 465 } else { 466 KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), 467 ts++, Bytes.toBytes(valueStr)); 468 p.add(kv); 469 } 470 } 471 } 472 p.setDurability(Durability.ASYNC_WAL); 473 region.put(p); 474 } 475 region.flush(true); 476 } 477 478 clearBlockCache(blockCache); 479 assertEquals(0, blockCache.getBlockCount()); 480 481 region.compact(false); 482 LOG.debug("compactStores() returned"); 483 484 boolean dataBlockCached = false; 485 boolean bloomBlockCached = false; 486 boolean indexBlockCached = false; 487 488 for (CachedBlock block : blockCache) { 489 if (DATA_BLOCK_TYPES.contains(block.getBlockType())) { 490 dataBlockCached = true; 491 } else if (BLOOM_BLOCK_TYPES.contains(block.getBlockType())) { 492 bloomBlockCached = true; 493 } else if (INDEX_BLOCK_TYPES.contains(block.getBlockType())) { 494 indexBlockCached = true; 495 } 496 } 497 498 // Data blocks should be cached in instances where we are caching blocks on write. In the case 499 // of testing 500 // BucketCache, we cannot verify block type as it is not stored in the cache. 501 boolean cacheOnCompactAndNonBucketCache = 502 cacheBlocksOnCompaction && !(blockCache instanceof BucketCache); 503 504 String assertErrorMessage = "\nTest description: " + testDescription 505 + "\ncacheBlocksOnCompaction: " + cacheBlocksOnCompaction + "\n"; 506 507 if (cacheOnCompactAndNonBucketCache && cacheBlocksOnCompactionThreshold > 0) { 508 if (cacheBlocksOnCompactionThreshold == CACHE_COMPACTION_HIGH_THRESHOLD) { 509 assertTrue(assertErrorMessage, dataBlockCached); 510 assertTrue(assertErrorMessage, bloomBlockCached); 511 assertTrue(assertErrorMessage, indexBlockCached); 512 } else { 513 assertFalse(assertErrorMessage, dataBlockCached); 514 515 if (localCacheBloomBlocksValue) { 516 assertTrue(assertErrorMessage, bloomBlockCached); 517 } else { 518 assertFalse(assertErrorMessage, bloomBlockCached); 519 } 520 521 if (localCacheIndexBlocksValue) { 522 assertTrue(assertErrorMessage, indexBlockCached); 523 } else { 524 assertFalse(assertErrorMessage, indexBlockCached); 525 } 526 } 527 } else { 528 assertEquals(assertErrorMessage, cacheOnCompactAndNonBucketCache, dataBlockCached); 529 530 if (cacheOnCompactAndNonBucketCache) { 531 assertTrue(assertErrorMessage, bloomBlockCached); 532 assertTrue(assertErrorMessage, indexBlockCached); 533 } 534 } 535 536 region.close(); 537 } finally { 538 // reset back 539 conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue); 540 conf.setLong(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY, 541 localCacheCompactedBlocksThreshold); 542 conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, localCacheBloomBlocksValue); 543 conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, localCacheIndexBlocksValue); 544 } 545 } 546 547 @Test 548 public void testStoreFileCacheOnWrite() throws IOException { 549 testStoreFileCacheOnWriteInternals(false); 550 testStoreFileCacheOnWriteInternals(true); 551 } 552 553 @Test 554 public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { 555 testCachingDataBlocksDuringCompactionInternals(false, false, -1); 556 testCachingDataBlocksDuringCompactionInternals(true, true, -1); 557 } 558 559 @Test 560 public void testCachingDataBlocksThresholdDuringCompaction() 561 throws IOException, InterruptedException { 562 testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_HIGH_THRESHOLD); 563 testCachingDataBlocksDuringCompactionInternals(false, true, CACHE_COMPACTION_LOW_THRESHOLD); 564 } 565 566}