001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.List; 025import java.util.Random; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FSDataInputStream; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.ArrayBackedTag; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparator; 033import org.apache.hadoop.hbase.CellComparatorImpl; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.Tag; 039import org.apache.hadoop.hbase.io.ByteBuffAllocator; 040import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; 043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 044import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; 045import org.apache.hadoop.hbase.nio.ByteBuff; 046import org.apache.hadoop.hbase.testclassification.IOTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.Writables; 050import org.apache.hadoop.io.Text; 051import org.junit.Assert; 052import org.junit.Before; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.runner.RunWith; 057import org.junit.runners.Parameterized; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Testing writing a version 3 {@link HFile} for all encoded blocks 063 */ 064@RunWith(Parameterized.class) 065@Category({ IOTests.class, MediumTests.class }) 066public class TestHFileWriterV3WithDataEncoders { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestHFileWriterV3WithDataEncoders.class); 071 072 private static final Logger LOG = 073 LoggerFactory.getLogger(TestHFileWriterV3WithDataEncoders.class); 074 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 075 private static final Random RNG = new Random(9713312); // Just a fixed seed. 076 077 private Configuration conf; 078 private FileSystem fs; 079 private boolean useTags; 080 private DataBlockEncoding dataBlockEncoding; 081 082 public TestHFileWriterV3WithDataEncoders(boolean useTags, DataBlockEncoding dataBlockEncoding) { 083 this.useTags = useTags; 084 this.dataBlockEncoding = dataBlockEncoding; 085 } 086 087 @Parameterized.Parameters 088 public static Collection<Object[]> parameters() { 089 DataBlockEncoding[] dataBlockEncodings = DataBlockEncoding.values(); 090 Object[][] params = new Object[dataBlockEncodings.length * 2 - 2][]; 091 int i = 0; 092 for (DataBlockEncoding dataBlockEncoding : dataBlockEncodings) { 093 if (dataBlockEncoding == DataBlockEncoding.NONE) { 094 continue; 095 } 096 params[i++] = new Object[] { false, dataBlockEncoding }; 097 params[i++] = new Object[] { true, dataBlockEncoding }; 098 } 099 return Arrays.asList(params); 100 } 101 102 @Before 103 public void setUp() throws IOException { 104 conf = TEST_UTIL.getConfiguration(); 105 fs = FileSystem.get(conf); 106 } 107 108 @Test 109 public void testHFileFormatV3() throws IOException { 110 testHFileFormatV3Internals(useTags); 111 } 112 113 private void testHFileFormatV3Internals(boolean useTags) throws IOException { 114 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3"); 115 final Compression.Algorithm compressAlgo = Compression.Algorithm.GZ; 116 final int entryCount = 10000; 117 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, false, useTags); 118 } 119 120 @Test 121 public void testMidKeyInHFile() throws IOException { 122 testMidKeyInHFileInternals(useTags); 123 } 124 125 private void testMidKeyInHFileInternals(boolean useTags) throws IOException { 126 Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testMidKeyInHFile"); 127 Compression.Algorithm compressAlgo = Compression.Algorithm.NONE; 128 int entryCount = 50000; 129 writeDataAndReadFromHFile(hfilePath, compressAlgo, entryCount, true, useTags); 130 } 131 132 private void writeDataAndReadFromHFile(Path hfilePath, Compression.Algorithm compressAlgo, 133 int entryCount, boolean findMidKey, boolean useTags) throws IOException { 134 135 HFileContext context = new HFileContextBuilder().withBlockSize(4096).withIncludesTags(useTags) 136 .withDataBlockEncoding(dataBlockEncoding).withCellComparator(CellComparatorImpl.COMPARATOR) 137 .withCompression(compressAlgo).build(); 138 CacheConfig cacheConfig = new CacheConfig(conf); 139 HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).withPath(fs, hfilePath) 140 .withFileContext(context).create(); 141 142 List<KeyValue> keyValues = new ArrayList<>(entryCount); 143 writeKeyValues(entryCount, useTags, writer, RNG, keyValues); 144 145 FSDataInputStream fsdis = fs.open(hfilePath); 146 147 long fileSize = fs.getFileStatus(hfilePath).getLen(); 148 FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize); 149 150 Assert.assertEquals(3, trailer.getMajorVersion()); 151 Assert.assertEquals(entryCount, trailer.getEntryCount()); 152 HFileContext meta = new HFileContextBuilder().withCompression(compressAlgo) 153 .withIncludesMvcc(true).withIncludesTags(useTags).withDataBlockEncoding(dataBlockEncoding) 154 .withHBaseCheckSum(true).build(); 155 ReaderContext readerContext = 156 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(fsdis)) 157 .withFilePath(hfilePath).withFileSystem(fs).withFileSize(fileSize).build(); 158 HFileBlock.FSReader blockReader = 159 new HFileBlock.FSReaderImpl(readerContext, meta, ByteBuffAllocator.HEAP, conf); 160 // Comparator class name is stored in the trailer in version 3. 161 CellComparator comparator = trailer.createComparator(); 162 HFileBlockIndex.BlockIndexReader dataBlockIndexReader = 163 new HFileBlockIndex.CellBasedKeyBlockIndexReaderV2(comparator, 164 trailer.getNumDataIndexLevels()); 165 HFileBlockIndex.BlockIndexReader metaBlockIndexReader = 166 new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1); 167 168 HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(), 169 fileSize - trailer.getTrailerSize()); 170 // Data index. We also read statistics about the block index written after 171 // the root level. 172 dataBlockIndexReader.readMultiLevelIndexRoot( 173 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount()); 174 175 FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fs, hfilePath); 176 readerContext = new ReaderContextBuilder().withFilePath(hfilePath).withFileSize(fileSize) 177 .withFileSystem(wrapper.getHfs()).withInputStreamWrapper(wrapper).build(); 178 HFileInfo hfile = new HFileInfo(readerContext, conf); 179 HFile.Reader reader = new HFilePreadReader(readerContext, hfile, cacheConfig, conf); 180 hfile.initMetaAndIndex(reader); 181 if (findMidKey) { 182 Cell midkey = dataBlockIndexReader.midkey(reader); 183 Assert.assertNotNull("Midkey should not be null", midkey); 184 } 185 186 // Meta index. 187 metaBlockIndexReader.readRootIndex( 188 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), 189 trailer.getMetaIndexCount()); 190 // File info 191 HFileInfo fileInfo = new HFileInfo(); 192 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); 193 byte[] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); 194 boolean includeMemstoreTS = 195 keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0; 196 197 // Counters for the number of key/value pairs and the number of blocks 198 int entriesRead = 0; 199 int blocksRead = 0; 200 long memstoreTS = 0; 201 202 DataBlockEncoder encoder = dataBlockEncoding.getEncoder(); 203 long curBlockPos = scanBlocks(entryCount, context, keyValues, fsdis, trailer, meta, blockReader, 204 entriesRead, blocksRead, encoder); 205 206 // Meta blocks. We can scan until the load-on-open data offset (which is 207 // the root block index offset in version 2) because we are not testing 208 // intermediate-level index blocks here. 209 210 int metaCounter = 0; 211 while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { 212 LOG.info("Current offset: {}, scanning until {}", fsdis.getPos(), 213 trailer.getLoadOnOpenDataOffset()); 214 HFileBlock block = 215 blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader); 216 Assert.assertEquals(BlockType.META, block.getBlockType()); 217 Text t = new Text(); 218 ByteBuff buf = block.getBufferWithoutHeader(); 219 if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { 220 throw new IOException( 221 "Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName()); 222 } 223 Text expectedText = (metaCounter == 0 ? new Text("Paris") 224 : metaCounter == 1 ? new Text("Moscow") 225 : new Text("Washington, D.C.")); 226 Assert.assertEquals(expectedText, t); 227 LOG.info("Read meta block data: " + t); 228 ++metaCounter; 229 curBlockPos += block.getOnDiskSizeWithHeader(); 230 } 231 232 fsdis.close(); 233 reader.close(); 234 } 235 236 private long scanBlocks(int entryCount, HFileContext context, List<KeyValue> keyValues, 237 FSDataInputStream fsdis, FixedFileTrailer trailer, HFileContext meta, 238 HFileBlock.FSReader blockReader, int entriesRead, int blocksRead, DataBlockEncoder encoder) 239 throws IOException { 240 // Scan blocks the way the reader would scan them 241 fsdis.seek(0); 242 long curBlockPos = 0; 243 while (curBlockPos <= trailer.getLastDataBlockOffset()) { 244 HFileBlockDecodingContext ctx = blockReader.getBlockDecodingContext(); 245 HFileBlock block = 246 blockReader.readBlockData(curBlockPos, -1, false, false, true).unpack(context, blockReader); 247 Assert.assertEquals(BlockType.ENCODED_DATA, block.getBlockType()); 248 ByteBuff origBlock = block.getBufferReadOnly(); 249 int pos = block.headerSize() + DataBlockEncoding.ID_SIZE; 250 origBlock.position(pos); 251 origBlock.limit(pos + block.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE); 252 ByteBuff buf = origBlock.slice(); 253 DataBlockEncoder.EncodedSeeker seeker = 254 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 255 seeker.setCurrentBuffer(buf); 256 Cell res = seeker.getCell(); 257 KeyValue kv = keyValues.get(entriesRead); 258 Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv)); 259 ++entriesRead; 260 while (seeker.next()) { 261 res = seeker.getCell(); 262 kv = keyValues.get(entriesRead); 263 Assert.assertEquals(0, CellComparatorImpl.COMPARATOR.compare(res, kv)); 264 ++entriesRead; 265 } 266 ++blocksRead; 267 curBlockPos += block.getOnDiskSizeWithHeader(); 268 } 269 LOG.info("Finished reading: entries={}, blocksRead = {}", entriesRead, blocksRead); 270 Assert.assertEquals(entryCount, entriesRead); 271 return curBlockPos; 272 } 273 274 private void writeKeyValues(int entryCount, boolean useTags, HFile.Writer writer, Random rand, 275 List<KeyValue> keyValues) throws IOException { 276 277 for (int i = 0; i < entryCount; ++i) { 278 byte[] keyBytes = RandomKeyValueUtil.randomOrderedKey(rand, i); 279 280 // A random-length random value. 281 byte[] valueBytes = RandomKeyValueUtil.randomValue(rand); 282 KeyValue keyValue = null; 283 if (useTags) { 284 ArrayList<Tag> tags = new ArrayList<>(); 285 for (int j = 0; j < 1 + rand.nextInt(4); j++) { 286 byte[] tagBytes = new byte[16]; 287 rand.nextBytes(tagBytes); 288 tags.add(new ArrayBackedTag((byte) 1, tagBytes)); 289 } 290 keyValue = 291 new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes, tags); 292 } else { 293 keyValue = new KeyValue(keyBytes, null, null, HConstants.LATEST_TIMESTAMP, valueBytes); 294 } 295 writer.append(keyValue); 296 keyValues.add(keyValue); 297 } 298 299 // Add in an arbitrary order. They will be sorted lexicographically by 300 // the key. 301 writer.appendMetaBlock("CAPITAL_OF_USA", new Text("Washington, D.C.")); 302 writer.appendMetaBlock("CAPITAL_OF_RUSSIA", new Text("Moscow")); 303 writer.appendMetaBlock("CAPITAL_OF_FRANCE", new Text("Paris")); 304 305 writer.close(); 306 } 307 308}