001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Map; 029import java.util.Random; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 039import org.apache.hadoop.hbase.io.compress.Compression; 040import org.apache.hadoop.hbase.testclassification.IOTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.After; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.runner.RunWith; 049import org.junit.runners.Parameterized; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 054 055/** 056 * A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig}, and 057 * {@link LruBlockCache}. 058 */ 059@Category({ IOTests.class, SmallTests.class }) 060@RunWith(Parameterized.class) 061public class TestLazyDataBlockDecompression { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestLazyDataBlockDecompression.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestLazyDataBlockDecompression.class); 068 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 069 private static final Random RNG = new Random(9713312); // Just a fixed seed. 070 071 private FileSystem fs; 072 073 @Parameterized.Parameter(0) 074 public boolean cacheOnWrite; 075 076 @Parameterized.Parameters 077 public static Iterable<Object[]> data() { 078 return Arrays.asList(new Object[][] { { false }, { true } }); 079 } 080 081 @Before 082 public void setUp() throws IOException { 083 fs = FileSystem.get(TEST_UTIL.getConfiguration()); 084 } 085 086 @After 087 public void tearDown() { 088 fs = null; 089 } 090 091 /** 092 * Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row bytes 093 * of the KeyValues written, in the order they were written. 094 */ 095 private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, 096 HFileContext cxt, int entryCount) throws IOException { 097 HFile.Writer writer = 098 new HFile.WriterFactory(conf, cc).withPath(fs, path).withFileContext(cxt).create(); 099 100 // write a bunch of random kvs 101 final byte[] family = Bytes.toBytes("f"); 102 final byte[] qualifier = Bytes.toBytes("q"); 103 for (int i = 0; i < entryCount; i++) { 104 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(RNG, i); 105 byte[] valueBytes = RandomKeyValueUtil.randomValue(RNG); 106 // make a real keyvalue so that hfile tool can examine it 107 writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes)); 108 } 109 writer.close(); 110 } 111 112 /** 113 * Read all blocks from {@code path} to populate {@code blockCache}. 114 */ 115 private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs, 116 Path path, HFileContext cxt) throws IOException { 117 FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); 118 long fileSize = fs.getFileStatus(path).getLen(); 119 FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); 120 ReaderContext context = new ReaderContextBuilder().withFilePath(path).withFileSize(fileSize) 121 .withFileSystem(fsdis.getHfs()).withInputStreamWrapper(fsdis).build(); 122 HFileInfo fileInfo = new HFileInfo(context, conf); 123 HFile.Reader reader = new HFilePreadReader(context, fileInfo, cacheConfig, conf); 124 fileInfo.initMetaAndIndex(reader); 125 long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset(); 126 List<HFileBlock> blocks = new ArrayList<>(4); 127 HFileBlock block; 128 while (offset <= max) { 129 block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, 130 /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); 131 offset += block.getOnDiskSizeWithHeader(); 132 blocks.add(block); 133 } 134 LOG.info("read " + Iterables.toString(blocks)); 135 reader.close(); 136 } 137 138 @Test 139 public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception { 140 // enough room for 2 uncompressed block 141 int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1); 142 Path hfilePath = 143 new Path(TEST_UTIL.getDataTestDir(), "testCompressionIncreasesEffectiveBlockcacheSize"); 144 HFileContext context = 145 new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).build(); 146 LOG.info("context=" + context); 147 148 // setup cache with lazy-decompression disabled. 149 Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 150 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 151 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 152 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 153 lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); 154 CacheConfig cc = new CacheConfig(lazyCompressDisabled, 155 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled)); 156 assertFalse(cc.shouldCacheDataCompressed()); 157 assertFalse(cc.isCombinedBlockCache()); 158 LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 159 LOG.info("disabledBlockCache=" + disabledBlockCache); 160 assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); 161 assertTrue("eviction thread spawned unintentionally.", 162 disabledBlockCache.getEvictionThread() == null); 163 assertEquals("freshly created blockcache contains blocks.", 0, 164 disabledBlockCache.getBlockCount()); 165 166 // 2000 kv's is ~3.6 full unencoded data blocks. 167 // Requires a conf and CacheConfig but should not be specific to this instance's cache settings 168 writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000); 169 170 // populate the cache 171 cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context); 172 long disabledBlockCount = disabledBlockCache.getBlockCount(); 173 assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount, 174 disabledBlockCount > 0); 175 long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount(); 176 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : disabledBlockCache.getMapForTests() 177 .entrySet()) { 178 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 179 assertTrue("found a packed block, block=" + block, block.isUnpacked()); 180 } 181 182 // count blocks with lazy decompression 183 Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 184 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 185 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 186 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite); 187 lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); 188 cc = new CacheConfig(lazyCompressEnabled, 189 new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled)); 190 assertTrue("test improperly configured.", cc.shouldCacheDataCompressed()); 191 assertTrue(cc.getBlockCache().get() instanceof LruBlockCache); 192 LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); 193 LOG.info("enabledBlockCache=" + enabledBlockCache); 194 assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize()); 195 assertTrue("eviction thread spawned unintentionally.", 196 enabledBlockCache.getEvictionThread() == null); 197 assertEquals("freshly created blockcache contains blocks.", 0, 198 enabledBlockCache.getBlockCount()); 199 200 cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context); 201 long enabledBlockCount = enabledBlockCache.getBlockCount(); 202 assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount, 203 enabledBlockCount > 0); 204 long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount(); 205 int candidatesFound = 0; 206 for (Map.Entry<BlockCacheKey, LruCachedBlock> e : enabledBlockCache.getMapForTests() 207 .entrySet()) { 208 candidatesFound++; 209 HFileBlock block = (HFileBlock) e.getValue().getBuffer(); 210 if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) { 211 assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" 212 + block.getBufferWithoutHeader().capacity(), block.isUnpacked()); 213 } 214 } 215 assertTrue("did not find any candidates for compressed caching. Invalid test.", 216 candidatesFound > 0); 217 218 LOG.info( 219 "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + enabledBlockCount); 220 assertTrue( 221 "enabling compressed data blocks should increase the effective cache size. " 222 + "disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" + enabledBlockCount, 223 disabledBlockCount < enabledBlockCount); 224 225 LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" 226 + enabledEvictedCount); 227 assertTrue("enabling compressed data blocks should reduce the number of evictions. " 228 + "disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" 229 + enabledEvictedCount, enabledEvictedCount < disabledEvictedCount); 230 } 231}