001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Random; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FSDataInputStream; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.fs.HFileSystem; 039import org.apache.hadoop.hbase.io.ByteBuffAllocator; 040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.nio.ByteBuff; 043import org.apache.hadoop.hbase.testclassification.IOTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.Before; 047import org.junit.ClassRule; 048import org.junit.Rule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.rules.TestName; 052 053@Category({ IOTests.class, MediumTests.class }) 054public class TestHFileBlockUnpack { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestHFileBlockUnpack.class); 059 060 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 061 062 // repetition gives us some chance to get a good compression ratio 063 private static float CHANCE_TO_REPEAT = 0.6f; 064 065 private static final int MIN_ALLOCATION_SIZE = 10 * 1024; 066 067 ByteBuffAllocator allocator; 068 069 @Rule 070 public TestName name = new TestName(); 071 private FileSystem fs; 072 073 @Before 074 public void setUp() throws Exception { 075 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 076 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 077 conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, MIN_ALLOCATION_SIZE); 078 allocator = ByteBuffAllocator.create(conf, true); 079 } 080 081 /** 082 * It's important that if you read and unpack the same HFileBlock twice, it results in an 083 * identical buffer each time. Otherwise we end up with validation failures in block cache, since 084 * contents may not match if the same block is cached twice. See 085 * https://issues.apache.org/jira/browse/HBASE-27053 086 */ 087 @Test 088 public void itUnpacksIdenticallyEachTime() throws IOException { 089 Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName()); 090 int totalSize = createTestBlock(path); 091 092 // Allocate a bunch of random buffers, so we can be sure that unpack will only have "dirty" 093 // buffers to choose from when allocating itself. 094 Random random = new Random(); 095 byte[] temp = new byte[HConstants.DEFAULT_BLOCKSIZE]; 096 List<ByteBuff> buffs = new ArrayList<>(); 097 for (int i = 0; i < 10; i++) { 098 ByteBuff buff = allocator.allocate(HConstants.DEFAULT_BLOCKSIZE); 099 random.nextBytes(temp); 100 buff.put(temp); 101 buffs.add(buff); 102 } 103 104 buffs.forEach(ByteBuff::release); 105 106 // read the same block twice. we should expect the underlying buffer below to 107 // be identical each time 108 HFileBlockWrapper blockOne = readBlock(path, totalSize); 109 HFileBlockWrapper blockTwo = readBlock(path, totalSize); 110 111 // first check size fields 112 assertEquals(blockOne.original.getOnDiskSizeWithHeader(), 113 blockTwo.original.getOnDiskSizeWithHeader()); 114 assertEquals(blockOne.original.getUncompressedSizeWithoutHeader(), 115 blockTwo.original.getUncompressedSizeWithoutHeader()); 116 117 // next check packed buffers 118 assertBuffersEqual(blockOne.original.getBufferWithoutHeader(), 119 blockTwo.original.getBufferWithoutHeader(), 120 blockOne.original.getOnDiskDataSizeWithHeader() - blockOne.original.headerSize()); 121 122 // now check unpacked buffers. prior to HBASE-27053, this would fail because 123 // the unpacked buffer would include extra space for checksums at the end that was not written. 124 // so the checksum space would be filled with random junk when re-using pooled buffers. 125 assertBuffersEqual(blockOne.unpacked.getBufferWithoutHeader(), 126 blockTwo.unpacked.getBufferWithoutHeader(), 127 blockOne.original.getUncompressedSizeWithoutHeader()); 128 } 129 130 private void assertBuffersEqual(ByteBuff bufferOne, ByteBuff bufferTwo, int expectedSize) { 131 assertEquals(expectedSize, bufferOne.limit()); 132 assertEquals(expectedSize, bufferTwo.limit()); 133 assertEquals(0, 134 ByteBuff.compareTo(bufferOne, 0, bufferOne.limit(), bufferTwo, 0, bufferTwo.limit())); 135 } 136 137 /** 138 * If the block on disk size is less than {@link ByteBuffAllocator}'s min allocation size, that 139 * block will be allocated to heap regardless of desire for off-heap. After de-compressing the 140 * block, the new size may now exceed the min allocation size. This test ensures that those 141 * de-compressed blocks, which will be allocated off-heap, are properly marked as 142 * {@link HFileBlock#isSharedMem()} == true See https://issues.apache.org/jira/browse/HBASE-27170 143 */ 144 @Test 145 public void itUsesSharedMemoryIfUnpackedBlockExceedsMinAllocationSize() throws IOException { 146 Path path = new Path(TEST_UTIL.getDataTestDir(), name.getMethodName()); 147 int totalSize = createTestBlock(path); 148 HFileBlockWrapper blockFromHFile = readBlock(path, totalSize); 149 150 assertFalse("expected hfile block to NOT be unpacked", blockFromHFile.original.isUnpacked()); 151 assertFalse("expected hfile block to NOT use shared memory", 152 blockFromHFile.original.isSharedMem()); 153 154 assertTrue( 155 "expected generated block size " + blockFromHFile.original.getOnDiskSizeWithHeader() 156 + " to be less than " + MIN_ALLOCATION_SIZE, 157 blockFromHFile.original.getOnDiskSizeWithHeader() < MIN_ALLOCATION_SIZE); 158 assertTrue( 159 "expected generated block uncompressed size " 160 + blockFromHFile.original.getUncompressedSizeWithoutHeader() + " to be more than " 161 + MIN_ALLOCATION_SIZE, 162 blockFromHFile.original.getUncompressedSizeWithoutHeader() > MIN_ALLOCATION_SIZE); 163 164 assertTrue("expected unpacked block to be unpacked", blockFromHFile.unpacked.isUnpacked()); 165 assertTrue("expected unpacked block to use shared memory", 166 blockFromHFile.unpacked.isSharedMem()); 167 } 168 169 private final static class HFileBlockWrapper { 170 private final HFileBlock original; 171 private final HFileBlock unpacked; 172 173 private HFileBlockWrapper(HFileBlock original, HFileBlock unpacked) { 174 this.original = original; 175 this.unpacked = unpacked; 176 } 177 } 178 179 private HFileBlockWrapper readBlock(Path path, int totalSize) throws IOException { 180 try (FSDataInputStream is = fs.open(path)) { 181 HFileContext meta = 182 new HFileContextBuilder().withHBaseCheckSum(true).withCompression(Compression.Algorithm.GZ) 183 .withIncludesMvcc(false).withIncludesTags(false).build(); 184 ReaderContext context = 185 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 186 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 187 HFileBlock.FSReaderImpl hbr = 188 new HFileBlock.FSReaderImpl(context, meta, allocator, TEST_UTIL.getConfiguration()); 189 hbr.setDataBlockEncoder(NoOpDataBlockEncoder.INSTANCE, TEST_UTIL.getConfiguration()); 190 hbr.setIncludesMemStoreTS(false); 191 HFileBlock blockFromHFile = hbr.readBlockData(0, -1, false, false, false); 192 blockFromHFile.sanityCheck(); 193 return new HFileBlockWrapper(blockFromHFile, blockFromHFile.unpack(meta, hbr)); 194 } 195 } 196 197 private int createTestBlock(Path path) throws IOException { 198 HFileContext meta = 199 new HFileContextBuilder().withCompression(Compression.Algorithm.GZ).withIncludesMvcc(false) 200 .withIncludesTags(false).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 201 202 int totalSize; 203 try (FSDataOutputStream os = fs.create(path)) { 204 HFileBlock.Writer hbw = 205 new HFileBlock.Writer(TEST_UTIL.getConfiguration(), NoOpDataBlockEncoder.INSTANCE, meta); 206 hbw.startWriting(BlockType.DATA); 207 writeTestKeyValues(hbw, MIN_ALLOCATION_SIZE - 1); 208 hbw.writeHeaderAndData(os); 209 totalSize = hbw.getOnDiskSizeWithHeader(); 210 assertTrue( 211 "expected generated block size " + totalSize + " to be less than " + MIN_ALLOCATION_SIZE, 212 totalSize < MIN_ALLOCATION_SIZE); 213 } 214 return totalSize; 215 } 216 217 static int writeTestKeyValues(HFileBlock.Writer hbw, int desiredSize) throws IOException { 218 Random random = new Random(42); 219 220 byte[] family = new byte[] { 1 }; 221 int rowKey = 0; 222 int qualifier = 0; 223 int value = 0; 224 long timestamp = 0; 225 226 int totalSize = 0; 227 228 // go until just up to the limit. compression should bring the total on-disk size under 229 while (totalSize < desiredSize) { 230 rowKey = maybeIncrement(random, rowKey); 231 qualifier = maybeIncrement(random, qualifier); 232 value = maybeIncrement(random, value); 233 timestamp = maybeIncrement(random, (int) timestamp); 234 235 KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKey), family, Bytes.toBytes(qualifier), 236 timestamp, Bytes.toBytes(value)); 237 hbw.write(keyValue); 238 totalSize += keyValue.getLength(); 239 } 240 241 return totalSize; 242 } 243 244 private static int maybeIncrement(Random random, int value) { 245 if (random.nextFloat() < CHANCE_TO_REPEAT) { 246 return value; 247 } 248 return value + 1; 249 } 250 251}