001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
023import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
024import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
025import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
026import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
027import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
028import static org.junit.Assert.assertEquals;
029
030import java.io.IOException;
031import java.util.Arrays;
032import java.util.Collection;
033import java.util.Random;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.ExtendedCell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.io.ByteBuffAllocator;
043import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
044import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
045import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
046import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
047import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache;
048import org.apache.hadoop.hbase.testclassification.IOTests;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.junit.After;
053import org.junit.Assert;
054import org.junit.Before;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063import org.junit.runners.Parameterized.Parameter;
064import org.junit.runners.Parameterized.Parameters;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068@RunWith(Parameterized.class)
069@Category({ IOTests.class, LargeTests.class })
070public class TestHFileScannerImplReferenceCount {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class);
075
076  @Rule
077  public TestName CASE = new TestName();
078
079  @Parameters(name = "{index}: ioengine={0}")
080  public static Collection<Object[]> data() {
081    return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" },
082      new Object[] { "mmap" }, new Object[] { "pmem" });
083  }
084
085  @Parameter
086  public String ioengine;
087
088  private static final Logger LOG =
089    LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
090  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
091  private static final Random RNG = new Random(9713312); // Just a fixed seed.
092  private static final byte[] FAMILY = Bytes.toBytes("f");
093  private static final byte[] QUALIFIER = Bytes.toBytes("q");
094  private static final byte[] SUFFIX = randLongBytes();
095  private static final int CELL_COUNT = 1000;
096
097  private static byte[] randLongBytes() {
098    byte[] keys = new byte[30];
099    Bytes.random(keys);
100    return keys;
101  }
102
103  // It's a deep copy of configuration of UTIL, DON'T use shallow copy.
104  private Configuration conf;
105  private Path workDir;
106  private FileSystem fs;
107  private Path hfilePath;
108  private ExtendedCell firstCell = null;
109  private ExtendedCell secondCell = null;
110  private ByteBuffAllocator allocator;
111
112  @BeforeClass
113  public static void setUpBeforeClass() {
114    Configuration conf = UTIL.getConfiguration();
115    // Set the max chunk size and min entries key to be very small for index block, so that we can
116    // create an index block tree with level >= 2.
117    conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
118    conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
119    // Create a bucket cache with 32MB.
120    conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
121    conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
122    conf.setInt(BUFFER_SIZE_KEY, 1024);
123    conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024);
124    // All allocated ByteBuff are pooled ByteBuff.
125    conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
126  }
127
128  @Before
129  public void setUp() throws IOException {
130    String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_");
131    this.workDir = UTIL.getDataTestDir(caseName);
132    if (!"offheap".equals(ioengine)) {
133      ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
134    }
135    UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
136    this.firstCell = null;
137    this.secondCell = null;
138    this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
139    this.conf = new Configuration(UTIL.getConfiguration());
140    this.fs = this.workDir.getFileSystem(conf);
141    this.hfilePath = new Path(this.workDir, caseName + EnvironmentEdgeManager.currentTime());
142    LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
143  }
144
145  @After
146  public void tearDown() throws IOException {
147    this.allocator.clean();
148    this.fs.delete(this.workDir, true);
149  }
150
151  private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException {
152    Assert.assertTrue(cache instanceof CombinedBlockCache);
153    BlockCache[] blockCaches = cache.getBlockCaches();
154    Assert.assertEquals(blockCaches.length, 2);
155    Assert.assertTrue(blockCaches[1] instanceof BucketCache);
156    TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]);
157  }
158
159  private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
160    DataBlockEncoding encoding, int cellCount) throws IOException {
161    HFileContext context =
162      new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE)
163        .withCompression(compression).withDataBlockEncoding(encoding).build();
164    try (HFile.Writer writer = new HFile.WriterFactory(conf, new CacheConfig(conf))
165      .withPath(fs, hfilePath).withFileContext(context).create()) {
166      for (int i = 0; i < cellCount; ++i) {
167        byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX);
168        // A random-length random value.
169        byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG);
170        KeyValue keyValue =
171          new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes);
172        if (firstCell == null) {
173          firstCell = keyValue;
174        } else if (secondCell == null) {
175          secondCell = keyValue;
176        }
177        writer.append(keyValue);
178      }
179    }
180  }
181
182  /**
183   * A careful UT for validating the reference count mechanism, if want to change this UT please
184   * read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design.
185   */
186  private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
187    throws Exception {
188    writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT);
189    HFileBlock curBlock, prevBlock;
190    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
191    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
192    Assert.assertNotNull(defaultBC);
193    Assert.assertTrue(cacheConfig.isCombinedBlockCache());
194    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
195    Assert.assertTrue(reader instanceof HFileReaderImpl);
196    // We've build a HFile tree with index = 16.
197    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
198
199    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
200    HFileBlock block1 = reader.getDataBlockIndexReader()
201      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
202      .getHFileBlock();
203    waitBucketCacheFlushed(defaultBC);
204    Assert.assertTrue(block1.getBlockType().isData());
205    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
206
207    HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null,
208      true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock();
209    waitBucketCacheFlushed(defaultBC);
210    Assert.assertTrue(block2.getBlockType().isData());
211    Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
212    // Only one refCnt for RPC path.
213    Assert.assertEquals(block1.refCnt(), 1);
214    Assert.assertEquals(block2.refCnt(), 1);
215    Assert.assertFalse(block1 == block2);
216
217    scanner.seekTo(firstCell);
218    curBlock = scanner.curBlock;
219    this.assertRefCnt(curBlock, 2);
220
221    // Seek to the block again, the curBlock won't change and won't read from BlockCache. so
222    // refCnt should be unchanged.
223    scanner.seekTo(firstCell);
224    Assert.assertTrue(curBlock == scanner.curBlock);
225    this.assertRefCnt(curBlock, 2);
226    prevBlock = curBlock;
227
228    scanner.seekTo(secondCell);
229    curBlock = scanner.curBlock;
230    this.assertRefCnt(prevBlock, 2);
231    this.assertRefCnt(curBlock, 2);
232
233    // After shipped, the prevBlock will be release, but curBlock is still referenced by the
234    // curBlock.
235    scanner.shipped();
236    this.assertRefCnt(prevBlock, 1);
237    this.assertRefCnt(curBlock, 2);
238
239    // Try to ship again, though with nothing to client.
240    scanner.shipped();
241    this.assertRefCnt(prevBlock, 1);
242    this.assertRefCnt(curBlock, 2);
243
244    // The curBlock will also be released.
245    scanner.close();
246    this.assertRefCnt(curBlock, 1);
247
248    // Finish the block & block2 RPC path
249    Assert.assertTrue(block1.release());
250    Assert.assertTrue(block2.release());
251
252    // Evict the LRUBlockCache
253    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
254    Assert.assertEquals(prevBlock.refCnt(), 0);
255    Assert.assertEquals(curBlock.refCnt(), 0);
256
257    int count = 0;
258    Assert.assertTrue(scanner.seekTo());
259    ++count;
260    while (scanner.next()) {
261      count++;
262    }
263    assertEquals(CELL_COUNT, count);
264  }
265
266  /**
267   * See HBASE-22480
268   */
269  @Test
270  public void testSeekBefore() throws Exception {
271    HFileBlock curBlock, prevBlock;
272    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
273    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
274    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
275    Assert.assertNotNull(defaultBC);
276    Assert.assertTrue(cacheConfig.isCombinedBlockCache());
277    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
278    Assert.assertTrue(reader instanceof HFileReaderImpl);
279    // We've build a HFile tree with index = 16.
280    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
281
282    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
283    HFileBlock block1 = reader.getDataBlockIndexReader()
284      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
285      .getHFileBlock();
286    Assert.assertTrue(block1.getBlockType().isData());
287    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
288    HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null,
289      true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock();
290    Assert.assertTrue(block2.getBlockType().isData());
291    Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
292    // Wait until flushed to IOEngine;
293    waitBucketCacheFlushed(defaultBC);
294    // One RPC reference path.
295    Assert.assertEquals(block1.refCnt(), 1);
296    Assert.assertEquals(block2.refCnt(), 1);
297
298    // Let the curBlock refer to block2.
299    scanner.seekTo(secondCell);
300    curBlock = scanner.curBlock;
301    Assert.assertFalse(curBlock == block2);
302    Assert.assertEquals(1, block2.refCnt());
303    this.assertRefCnt(curBlock, 2);
304    prevBlock = scanner.curBlock;
305
306    // Release the block1, no other reference.
307    Assert.assertTrue(block1.release());
308    Assert.assertEquals(0, block1.refCnt());
309    // Release the block2, no other reference.
310    Assert.assertTrue(block2.release());
311    Assert.assertEquals(0, block2.refCnt());
312
313    // Do the seekBefore: the newBlock will be the previous block of curBlock.
314    Assert.assertTrue(scanner.seekBefore(secondCell));
315    Assert.assertEquals(scanner.prevBlocks.size(), 1);
316    Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock);
317    curBlock = scanner.curBlock;
318    // the curBlock is read from IOEngine, so a different block.
319    Assert.assertFalse(curBlock == block1);
320    // Two reference for curBlock: 1. scanner; 2. blockCache.
321    this.assertRefCnt(curBlock, 2);
322    // Reference count of prevBlock must be unchanged because we haven't shipped.
323    this.assertRefCnt(prevBlock, 2);
324
325    // Do the shipped
326    scanner.shipped();
327    Assert.assertEquals(scanner.prevBlocks.size(), 0);
328    Assert.assertNotNull(scanner.curBlock);
329    this.assertRefCnt(curBlock, 2);
330    this.assertRefCnt(prevBlock, 1);
331
332    // Do the close
333    scanner.close();
334    Assert.assertNull(scanner.curBlock);
335    this.assertRefCnt(curBlock, 1);
336    this.assertRefCnt(prevBlock, 1);
337
338    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
339    Assert.assertEquals(0, curBlock.refCnt());
340    Assert.assertEquals(0, prevBlock.refCnt());
341
342    // Reload the block1 again.
343    block1 = reader.getDataBlockIndexReader()
344      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
345      .getHFileBlock();
346    // Wait until flushed to IOEngine;
347    waitBucketCacheFlushed(defaultBC);
348    Assert.assertTrue(block1.getBlockType().isData());
349    Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
350    Assert.assertTrue(block1.release());
351    Assert.assertEquals(0, block1.refCnt());
352    // Re-seek to the begin.
353    Assert.assertTrue(scanner.seekTo());
354    curBlock = scanner.curBlock;
355    Assert.assertFalse(curBlock == block1);
356    this.assertRefCnt(curBlock, 2);
357    // Return false because firstCell <= c[0]
358    Assert.assertFalse(scanner.seekBefore(firstCell));
359    // The block1 shouldn't be released because we still don't do the shipped or close.
360    this.assertRefCnt(curBlock, 2);
361
362    scanner.close();
363    this.assertRefCnt(curBlock, 1);
364    Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
365    Assert.assertEquals(0, curBlock.refCnt());
366  }
367
368  private void assertRefCnt(HFileBlock block, int value) {
369    if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
370      Assert.assertEquals(value, block.refCnt());
371    } else {
372      Assert.assertEquals(value - 1, block.refCnt());
373    }
374  }
375
376  @Test
377  public void testDefault() throws Exception {
378    testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);
379  }
380
381  @Test
382  public void testCompression() throws Exception {
383    testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE);
384  }
385
386  @Test
387  public void testDataBlockEncoding() throws Exception {
388    testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1);
389  }
390
391  @Test
392  public void testDataBlockEncodingAndCompression() throws Exception {
393    testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
394  }
395
396  @Test
397  public void testWithLruBlockCache() throws Exception {
398    HFileBlock curBlock;
399    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
400    // Set LruBlockCache
401    conf.set(BUCKET_CACHE_IOENGINE_KEY, "");
402    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
403    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
404    Assert.assertNotNull(defaultBC);
405    Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
406    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
407    Assert.assertTrue(reader instanceof HFileReaderImpl);
408    // We've build a HFile tree with index = 16.
409    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
410
411    HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false);
412    HFileBlock block1 = reader.getDataBlockIndexReader()
413      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
414      .getHFileBlock();
415    Assert.assertTrue(block1.getBlockType().isData());
416    Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock);
417    HFileBlock block2 = reader.getDataBlockIndexReader().loadDataBlockWithScanInfo(secondCell, null,
418      true, true, false, DataBlockEncoding.NONE, reader).getHFileBlock();
419    Assert.assertTrue(block2.getBlockType().isData());
420    Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock);
421    // One RPC reference path.
422    Assert.assertEquals(block1.refCnt(), 0);
423    Assert.assertEquals(block2.refCnt(), 0);
424
425    scanner.seekTo(firstCell);
426    curBlock = scanner.curBlock;
427    Assert.assertTrue(curBlock == block1);
428    Assert.assertEquals(curBlock.refCnt(), 0);
429    Assert.assertTrue(scanner.prevBlocks.isEmpty());
430
431    // Switch to next block
432    scanner.seekTo(secondCell);
433    curBlock = scanner.curBlock;
434    Assert.assertTrue(curBlock == block2);
435    Assert.assertEquals(curBlock.refCnt(), 0);
436    Assert.assertEquals(curBlock.retain().refCnt(), 0);
437    // Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep
438    // in prevBlocks.
439    Assert.assertTrue(scanner.prevBlocks.isEmpty());
440
441    // close the scanner
442    scanner.close();
443    Assert.assertNull(scanner.curBlock);
444    Assert.assertTrue(scanner.prevBlocks.isEmpty());
445  }
446
447  @Test
448  public void testDisabledBlockCache() throws Exception {
449    writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
450    // Set LruBlockCache
451    conf.setFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
452    BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
453    Assert.assertNull(defaultBC);
454    CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
455    Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
456    HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
457    Assert.assertTrue(reader instanceof HFileReaderImpl);
458    // We've build a HFile tree with index = 16.
459    Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
460
461    HFileBlock block1 = reader.getDataBlockIndexReader()
462      .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader)
463      .getHFileBlock();
464
465    Assert.assertTrue(block1.isSharedMem());
466    Assert.assertTrue(block1 instanceof SharedMemHFileBlock);
467    Assert.assertEquals(1, block1.refCnt());
468    Assert.assertTrue(block1.release());
469  }
470}