001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; 021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; 022import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; 023import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY; 024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; 025import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY; 026import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY; 027import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY; 028import static org.junit.Assert.assertEquals; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.Collection; 033import java.util.Random; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.ExtendedCell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.io.ByteBuffAllocator; 043import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 045import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl; 046import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 047import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache; 048import org.apache.hadoop.hbase.testclassification.IOTests; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.junit.After; 053import org.junit.Assert; 054import org.junit.Before; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061import org.junit.runner.RunWith; 062import org.junit.runners.Parameterized; 063import org.junit.runners.Parameterized.Parameter; 064import org.junit.runners.Parameterized.Parameters; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068@RunWith(Parameterized.class) 069@Category({ IOTests.class, LargeTests.class }) 070public class TestHFileScannerImplReferenceCount { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class); 075 076 @Rule 077 public TestName CASE = new TestName(); 078 079 @Parameters(name = "{index}: ioengine={0}") 080 public static Collection<Object[]> data() { 081 return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" }, 082 new Object[] { "mmap" }, new Object[] { "pmem" }); 083 } 084 085 @Parameter 086 public String ioengine; 087 088 private static final Logger LOG = 089 LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class); 090 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 091 private static final Random RNG = new Random(9713312); // Just a fixed seed. 092 private static final byte[] FAMILY = Bytes.toBytes("f"); 093 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 094 private static final byte[] SUFFIX = randLongBytes(); 095 private static final int CELL_COUNT = 1000; 096 097 private static byte[] randLongBytes() { 098 byte[] keys = new byte[30]; 099 Bytes.random(keys); 100 return keys; 101 } 102 103 // It's a deep copy of configuration of UTIL, DON'T use shallow copy. 104 private Configuration conf; 105 private Path workDir; 106 private FileSystem fs; 107 private Path hfilePath; 108 private ExtendedCell firstCell = null; 109 private ExtendedCell secondCell = null; 110 private ByteBuffAllocator allocator; 111 112 @BeforeClass 113 public static void setUpBeforeClass() { 114 Configuration conf = UTIL.getConfiguration(); 115 // Set the max chunk size and min entries key to be very small for index block, so that we can 116 // create an index block tree with level >= 2. 117 conf.setInt(MAX_CHUNK_SIZE_KEY, 10); 118 conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2); 119 // Create a bucket cache with 32MB. 120 conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap"); 121 conf.setInt(BUCKET_CACHE_SIZE_KEY, 32); 122 conf.setInt(BUFFER_SIZE_KEY, 1024); 123 conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024); 124 // All allocated ByteBuff are pooled ByteBuff. 125 conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0); 126 } 127 128 @Before 129 public void setUp() throws IOException { 130 String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"); 131 this.workDir = UTIL.getDataTestDir(caseName); 132 if (!"offheap".equals(ioengine)) { 133 ioengine = ioengine + ":" + workDir.toString() + "/cachedata"; 134 } 135 UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine); 136 this.firstCell = null; 137 this.secondCell = null; 138 this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true); 139 this.conf = new Configuration(UTIL.getConfiguration()); 140 this.fs = this.workDir.getFileSystem(conf); 141 this.hfilePath = new Path(this.workDir, caseName + EnvironmentEdgeManager.currentTime()); 142 LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName); 143 } 144 145 @After 146 public void tearDown() throws IOException { 147 this.allocator.clean(); 148 this.fs.delete(this.workDir, true); 149 } 150 151 private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException { 152 Assert.assertTrue(cache instanceof CombinedBlockCache); 153 BlockCache[] blockCaches = cache.getBlockCaches(); 154 Assert.assertEquals(blockCaches.length, 2); 155 Assert.assertTrue(blockCaches[1] instanceof BucketCache); 156 TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]); 157 } 158 159 private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression, 160 DataBlockEncoding encoding, int cellCount) throws IOException { 161 HFileContext context = 162 new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE) 163 .withCompression(compression).withDataBlockEncoding(encoding).build(); 164 try (HFile.Writer writer = new HFile.WriterFactory(conf, new CacheConfig(conf)) 165 .withPath(fs, hfilePath).withFileContext(context).create()) { 166 for (int i = 0; i < cellCount; ++i) { 167 byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX); 168 // A random-length random value. 169 byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG); 170 KeyValue keyValue = 171 new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes); 172 if (firstCell == null) { 173 firstCell = keyValue; 174 } else if (secondCell == null) { 175 secondCell = keyValue; 176 } 177 writer.append(keyValue); 178 } 179 } 180 } 181 182 /** 183 * A careful UT for validating the reference count mechanism, if want to change this UT please 184 * read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design. 185 */ 186 private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding) 187 throws Exception { 188 writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT); 189 HFileBlock curBlock, prevBlock; 190 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 191 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 192 Assert.assertNotNull(defaultBC); 193 Assert.assertTrue(cacheConfig.isCombinedBlockCache()); 194 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 195 Assert.assertTrue(reader instanceof HFileReaderImpl); 196 // We've build a HFile tree with index = 16. 197 Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 198 199 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); 200 HFileBlock block1 = reader.getDataBlockIndexReader() 201 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 202 .getHFileBlock(); 203 waitBucketCacheFlushed(defaultBC); 204 Assert.assertTrue(block1.getBlockType().isData()); 205 Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock); 206 207 HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null, 208 true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock(); 209 waitBucketCacheFlushed(defaultBC); 210 Assert.assertTrue(block2.getBlockType().isData()); 211 Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock); 212 // Only one refCnt for RPC path. 213 Assert.assertEquals(block1.refCnt(), 1); 214 Assert.assertEquals(block2.refCnt(), 1); 215 Assert.assertFalse(block1 == block2); 216 217 scanner.seekTo(firstCell); 218 curBlock = scanner.curBlock; 219 this.assertRefCnt(curBlock, 2); 220 221 // Seek to the block again, the curBlock won't change and won't read from BlockCache. so 222 // refCnt should be unchanged. 223 scanner.seekTo(firstCell); 224 Assert.assertTrue(curBlock == scanner.curBlock); 225 this.assertRefCnt(curBlock, 2); 226 prevBlock = curBlock; 227 228 scanner.seekTo(secondCell); 229 curBlock = scanner.curBlock; 230 this.assertRefCnt(prevBlock, 2); 231 this.assertRefCnt(curBlock, 2); 232 233 // After shipped, the prevBlock will be release, but curBlock is still referenced by the 234 // curBlock. 235 scanner.shipped(); 236 this.assertRefCnt(prevBlock, 1); 237 this.assertRefCnt(curBlock, 2); 238 239 // Try to ship again, though with nothing to client. 240 scanner.shipped(); 241 this.assertRefCnt(prevBlock, 1); 242 this.assertRefCnt(curBlock, 2); 243 244 // The curBlock will also be released. 245 scanner.close(); 246 this.assertRefCnt(curBlock, 1); 247 248 // Finish the block & block2 RPC path 249 Assert.assertTrue(block1.release()); 250 Assert.assertTrue(block2.release()); 251 252 // Evict the LRUBlockCache 253 Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); 254 Assert.assertEquals(prevBlock.refCnt(), 0); 255 Assert.assertEquals(curBlock.refCnt(), 0); 256 257 int count = 0; 258 Assert.assertTrue(scanner.seekTo()); 259 ++count; 260 while (scanner.next()) { 261 count++; 262 } 263 assertEquals(CELL_COUNT, count); 264 } 265 266 /** 267 * See HBASE-22480 268 */ 269 @Test 270 public void testSeekBefore() throws Exception { 271 HFileBlock curBlock, prevBlock; 272 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 273 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 274 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 275 Assert.assertNotNull(defaultBC); 276 Assert.assertTrue(cacheConfig.isCombinedBlockCache()); 277 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 278 Assert.assertTrue(reader instanceof HFileReaderImpl); 279 // We've build a HFile tree with index = 16. 280 Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 281 282 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); 283 HFileBlock block1 = reader.getDataBlockIndexReader() 284 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 285 .getHFileBlock(); 286 Assert.assertTrue(block1.getBlockType().isData()); 287 Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock); 288 HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null, 289 true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock(); 290 Assert.assertTrue(block2.getBlockType().isData()); 291 Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock); 292 // Wait until flushed to IOEngine; 293 waitBucketCacheFlushed(defaultBC); 294 // One RPC reference path. 295 Assert.assertEquals(block1.refCnt(), 1); 296 Assert.assertEquals(block2.refCnt(), 1); 297 298 // Let the curBlock refer to block2. 299 scanner.seekTo(secondCell); 300 curBlock = scanner.curBlock; 301 Assert.assertFalse(curBlock == block2); 302 Assert.assertEquals(1, block2.refCnt()); 303 this.assertRefCnt(curBlock, 2); 304 prevBlock = scanner.curBlock; 305 306 // Release the block1, no other reference. 307 Assert.assertTrue(block1.release()); 308 Assert.assertEquals(0, block1.refCnt()); 309 // Release the block2, no other reference. 310 Assert.assertTrue(block2.release()); 311 Assert.assertEquals(0, block2.refCnt()); 312 313 // Do the seekBefore: the newBlock will be the previous block of curBlock. 314 Assert.assertTrue(scanner.seekBefore(secondCell)); 315 Assert.assertEquals(scanner.prevBlocks.size(), 1); 316 Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock); 317 curBlock = scanner.curBlock; 318 // the curBlock is read from IOEngine, so a different block. 319 Assert.assertFalse(curBlock == block1); 320 // Two reference for curBlock: 1. scanner; 2. blockCache. 321 this.assertRefCnt(curBlock, 2); 322 // Reference count of prevBlock must be unchanged because we haven't shipped. 323 this.assertRefCnt(prevBlock, 2); 324 325 // Do the shipped 326 scanner.shipped(); 327 Assert.assertEquals(scanner.prevBlocks.size(), 0); 328 Assert.assertNotNull(scanner.curBlock); 329 this.assertRefCnt(curBlock, 2); 330 this.assertRefCnt(prevBlock, 1); 331 332 // Do the close 333 scanner.close(); 334 Assert.assertNull(scanner.curBlock); 335 this.assertRefCnt(curBlock, 1); 336 this.assertRefCnt(prevBlock, 1); 337 338 Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2); 339 Assert.assertEquals(0, curBlock.refCnt()); 340 Assert.assertEquals(0, prevBlock.refCnt()); 341 342 // Reload the block1 again. 343 block1 = reader.getDataBlockIndexReader() 344 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 345 .getHFileBlock(); 346 // Wait until flushed to IOEngine; 347 waitBucketCacheFlushed(defaultBC); 348 Assert.assertTrue(block1.getBlockType().isData()); 349 Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock); 350 Assert.assertTrue(block1.release()); 351 Assert.assertEquals(0, block1.refCnt()); 352 // Re-seek to the begin. 353 Assert.assertTrue(scanner.seekTo()); 354 curBlock = scanner.curBlock; 355 Assert.assertFalse(curBlock == block1); 356 this.assertRefCnt(curBlock, 2); 357 // Return false because firstCell <= c[0] 358 Assert.assertFalse(scanner.seekBefore(firstCell)); 359 // The block1 shouldn't be released because we still don't do the shipped or close. 360 this.assertRefCnt(curBlock, 2); 361 362 scanner.close(); 363 this.assertRefCnt(curBlock, 1); 364 Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1); 365 Assert.assertEquals(0, curBlock.refCnt()); 366 } 367 368 private void assertRefCnt(HFileBlock block, int value) { 369 if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) { 370 Assert.assertEquals(value, block.refCnt()); 371 } else { 372 Assert.assertEquals(value - 1, block.refCnt()); 373 } 374 } 375 376 @Test 377 public void testDefault() throws Exception { 378 testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE); 379 } 380 381 @Test 382 public void testCompression() throws Exception { 383 testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE); 384 } 385 386 @Test 387 public void testDataBlockEncoding() throws Exception { 388 testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1); 389 } 390 391 @Test 392 public void testDataBlockEncodingAndCompression() throws Exception { 393 testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1); 394 } 395 396 @Test 397 public void testWithLruBlockCache() throws Exception { 398 HFileBlock curBlock; 399 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 400 // Set LruBlockCache 401 conf.set(BUCKET_CACHE_IOENGINE_KEY, ""); 402 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 403 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 404 Assert.assertNotNull(defaultBC); 405 Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache. 406 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 407 Assert.assertTrue(reader instanceof HFileReaderImpl); 408 // We've build a HFile tree with index = 16. 409 Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 410 411 HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); 412 HFileBlock block1 = reader.getDataBlockIndexReader() 413 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 414 .getHFileBlock(); 415 Assert.assertTrue(block1.getBlockType().isData()); 416 Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock); 417 HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null, 418 true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock(); 419 Assert.assertTrue(block2.getBlockType().isData()); 420 Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock); 421 // One RPC reference path. 422 Assert.assertEquals(block1.refCnt(), 0); 423 Assert.assertEquals(block2.refCnt(), 0); 424 425 scanner.seekTo(firstCell); 426 curBlock = scanner.curBlock; 427 Assert.assertTrue(curBlock == block1); 428 Assert.assertEquals(curBlock.refCnt(), 0); 429 Assert.assertTrue(scanner.prevBlocks.isEmpty()); 430 431 // Switch to next block 432 scanner.seekTo(secondCell); 433 curBlock = scanner.curBlock; 434 Assert.assertTrue(curBlock == block2); 435 Assert.assertEquals(curBlock.refCnt(), 0); 436 Assert.assertEquals(curBlock.retain().refCnt(), 0); 437 // Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep 438 // in prevBlocks. 439 Assert.assertTrue(scanner.prevBlocks.isEmpty()); 440 441 // close the scanner 442 scanner.close(); 443 Assert.assertNull(scanner.curBlock); 444 Assert.assertTrue(scanner.prevBlocks.isEmpty()); 445 } 446 447 @Test 448 public void testDisabledBlockCache() throws Exception { 449 writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT); 450 // Set LruBlockCache 451 conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); 452 BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf); 453 Assert.assertNull(defaultBC); 454 CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator); 455 Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache. 456 HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf); 457 Assert.assertTrue(reader instanceof HFileReaderImpl); 458 // We've build a HFile tree with index = 16. 459 Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); 460 461 HFileBlock block1 = reader.getDataBlockIndexReader() 462 .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) 463 .getHFileBlock(); 464 465 Assert.assertTrue(block1.isSharedMem()); 466 Assert.assertTrue(block1 instanceof SharedMemHFileBlock); 467 Assert.assertEquals(1, block1.refCnt()); 468 Assert.assertTrue(block1.release()); 469 } 470}