001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; 021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; 022import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; 023import static org.junit.Assert.*; 024 025import java.io.ByteArrayOutputStream; 026import java.io.DataOutputStream; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.nio.ByteBuffer; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Optional; 037import java.util.Random; 038import java.util.concurrent.Callable; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.Executor; 041import java.util.concurrent.ExecutorCompletionService; 042import java.util.concurrent.Executors; 043import java.util.concurrent.Future; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FSDataInputStream; 046import org.apache.hadoop.fs.FSDataOutputStream; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.hbase.ArrayBackedTag; 050import org.apache.hadoop.hbase.CellComparatorImpl; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.HBaseClassTestRule; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtil; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.Tag; 058import org.apache.hadoop.hbase.fs.HFileSystem; 059import org.apache.hadoop.hbase.io.ByteBuffAllocator; 060import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 061import org.apache.hadoop.hbase.io.compress.Compression; 062import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 063import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 064import org.apache.hadoop.hbase.nio.ByteBuff; 065import org.apache.hadoop.hbase.nio.MultiByteBuff; 066import org.apache.hadoop.hbase.nio.SingleByteBuff; 067import org.apache.hadoop.hbase.testclassification.IOTests; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.ChecksumType; 071import org.apache.hadoop.hbase.util.ClassSize; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.io.WritableUtils; 074import org.apache.hadoop.io.compress.Compressor; 075import org.junit.After; 076import org.junit.Before; 077import org.junit.ClassRule; 078import org.junit.Test; 079import org.junit.experimental.categories.Category; 080import org.junit.runner.RunWith; 081import org.junit.runners.Parameterized; 082import org.junit.runners.Parameterized.Parameters; 083import org.mockito.Mockito; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087@Category({ IOTests.class, LargeTests.class }) 088@RunWith(Parameterized.class) 089public class TestHFileBlock { 090 091 @ClassRule 092 public static final HBaseClassTestRule CLASS_RULE = 093 HBaseClassTestRule.forClass(TestHFileBlock.class); 094 095 // change this value to activate more logs 096 private static final boolean detailedLogging = false; 097 private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true }; 098 099 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); 100 101 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; 102 103 private static final int NUM_TEST_BLOCKS = 1000; 104 private static final int NUM_READER_THREADS = 26; 105 private static final int MAX_BUFFER_COUNT = 2048; 106 107 // Used to generate KeyValues 108 private static int NUM_KEYVALUES = 50; 109 private static int FIELD_LENGTH = 10; 110 private static float CHANCE_TO_REPEAT = 0.6f; 111 112 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 113 private static final Random RNG = new Random(); // This test depends on Random#setSeed 114 private FileSystem fs; 115 116 private final boolean includesMemstoreTS; 117 private final boolean includesTag; 118 private final boolean useHeapAllocator; 119 private final ByteBuffAllocator alloc; 120 121 public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) { 122 this.includesMemstoreTS = includesMemstoreTS; 123 this.includesTag = includesTag; 124 this.useHeapAllocator = useHeapAllocator; 125 this.alloc = useHeapAllocator ? HEAP : createOffHeapAlloc(); 126 assertAllocator(); 127 } 128 129 @Parameters 130 public static Collection<Object[]> parameters() { 131 List<Object[]> params = new ArrayList<>(); 132 // Generate boolean triples from 000 to 111 133 for (int i = 0; i < (1 << 3); i++) { 134 Object[] flags = new Boolean[3]; 135 for (int k = 0; k < 3; k++) { 136 flags[k] = (i & (1 << k)) != 0; 137 } 138 params.add(flags); 139 } 140 return params; 141 } 142 143 private ByteBuffAllocator createOffHeapAlloc() { 144 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 145 conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT); 146 conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); 147 ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true); 148 // Fill the allocator 149 List<ByteBuff> bufs = new ArrayList<>(); 150 for (int i = 0; i < MAX_BUFFER_COUNT; i++) { 151 ByteBuff bb = alloc.allocateOneBuffer(); 152 assertTrue(!bb.hasArray()); 153 bufs.add(bb); 154 } 155 bufs.forEach(ByteBuff::release); 156 return alloc; 157 } 158 159 private void assertAllocator() { 160 if (!useHeapAllocator) { 161 assertEquals(MAX_BUFFER_COUNT, alloc.getFreeBufferCount()); 162 } 163 } 164 165 @Before 166 public void setUp() throws IOException { 167 fs = HFileSystem.get(TEST_UTIL.getConfiguration()); 168 } 169 170 @After 171 public void tearDown() throws IOException { 172 assertAllocator(); 173 alloc.clean(); 174 } 175 176 static void writeTestBlockContents(DataOutputStream dos) throws IOException { 177 // This compresses really well. 178 for (int i = 0; i < 1000; ++i) 179 dos.writeInt(i / 100); 180 } 181 182 static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS, 183 boolean useTag) throws IOException { 184 List<KeyValue> keyValues = new ArrayList<>(); 185 186 // generate keyValues 187 RNG.setSeed(42); // just any fixed number 188 for (int i = 0; i < NUM_KEYVALUES; ++i) { 189 byte[] row; 190 long timestamp; 191 byte[] family; 192 byte[] qualifier; 193 byte[] value; 194 195 // generate it or repeat, it should compress well 196 if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) { 197 row = CellUtil.cloneRow(keyValues.get(RNG.nextInt(keyValues.size()))); 198 } else { 199 row = new byte[FIELD_LENGTH]; 200 RNG.nextBytes(row); 201 } 202 if (0 == i) { 203 family = new byte[FIELD_LENGTH]; 204 RNG.nextBytes(family); 205 } else { 206 family = CellUtil.cloneFamily(keyValues.get(0)); 207 } 208 if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) { 209 qualifier = CellUtil.cloneQualifier(keyValues.get(RNG.nextInt(keyValues.size()))); 210 } else { 211 qualifier = new byte[FIELD_LENGTH]; 212 RNG.nextBytes(qualifier); 213 } 214 if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) { 215 value = CellUtil.cloneValue(keyValues.get(RNG.nextInt(keyValues.size()))); 216 } else { 217 value = new byte[FIELD_LENGTH]; 218 RNG.nextBytes(value); 219 } 220 if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) { 221 timestamp = keyValues.get(RNG.nextInt(keyValues.size())).getTimestamp(); 222 } else { 223 timestamp = RNG.nextLong(); 224 } 225 if (!useTag) { 226 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value)); 227 } else { 228 keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, 229 new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) })); 230 } 231 } 232 233 // sort it and write to stream 234 int totalSize = 0; 235 Collections.sort(keyValues, CellComparatorImpl.COMPARATOR); 236 237 for (KeyValue kv : keyValues) { 238 totalSize += kv.getLength(); 239 if (includesMemstoreTS) { 240 long memstoreTS = RNG.nextLong(); 241 kv.setSequenceId(memstoreTS); 242 totalSize += WritableUtils.getVIntSize(memstoreTS); 243 } 244 hbw.write(kv); 245 } 246 return totalSize; 247 } 248 249 public byte[] createTestV1Block(Compression.Algorithm algo) throws IOException { 250 Compressor compressor = algo.getCompressor(); 251 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 252 OutputStream os = algo.createCompressionStream(baos, compressor, 0); 253 DataOutputStream dos = new DataOutputStream(os); 254 BlockType.META.write(dos); // Let's make this a meta block. 255 writeTestBlockContents(dos); 256 dos.flush(); 257 algo.returnCompressor(compressor); 258 return baos.toByteArray(); 259 } 260 261 static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo, boolean includesMemstoreTS, 262 boolean includesTag) throws IOException { 263 final BlockType blockType = BlockType.DATA; 264 HFileContext meta = new HFileContextBuilder().withCompression(algo) 265 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) 266 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 267 HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta); 268 DataOutputStream dos = hbw.startWriting(blockType); 269 writeTestBlockContents(dos); 270 dos.flush(); 271 hbw.ensureBlockReady(); 272 assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); 273 hbw.release(); 274 return hbw; 275 } 276 277 public String createTestBlockStr(Compression.Algorithm algo, int correctLength, boolean useTag) 278 throws IOException { 279 HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag); 280 byte[] testV2Block = hbw.getHeaderAndDataForTest(); 281 int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9; 282 if (testV2Block.length == correctLength) { 283 // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid 284 // variations across operating systems. 285 // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. 286 // We only make this change when the compressed block length matches. 287 // Otherwise, there are obviously other inconsistencies. 288 testV2Block[osOffset] = 3; 289 } 290 return Bytes.toStringBinary(testV2Block); 291 } 292 293 @Test 294 public void testNoCompression() throws IOException { 295 CacheConfig cacheConf = Mockito.mock(CacheConfig.class); 296 Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty()); 297 298 HFileBlock block = 299 createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf); 300 assertEquals(4000, block.getUncompressedSizeWithoutHeader()); 301 assertEquals(4004, block.getOnDiskSizeWithoutHeader()); 302 assertTrue(block.isUnpacked()); 303 } 304 305 @Test 306 public void testGzipCompression() throws IOException { 307 // @formatter:off 308 String correctTestBlockStr = "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" 309 + "\\xFF\\xFF\\xFF\\xFF" 310 + "\\x0" + ChecksumType.getDefaultChecksumType().getCode() 311 + "\\x00\\x00@\\x00\\x00\\x00\\x00[" 312 // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html 313 + "\\x1F\\x8B" // gzip magic signature 314 + "\\x08" // Compression method: 8 = "deflate" 315 + "\\x00" // Flags 316 + "\\x00\\x00\\x00\\x00" // mtime 317 + "\\x00" // XFL (extra flags) 318 // OS (0 = FAT filesystems, 3 = Unix). However, this field 319 // sometimes gets set to 0 on Linux and Mac, so we reset it to 3. 320 // This appears to be a difference caused by the availability 321 // (and use) of the native GZ codec. 322 + "\\x03" 323 + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" 324 + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" 325 + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00" 326 + "\\x00\\x00\\x00\\x00"; // 4 byte checksum (ignored) 327 // @formatter:on 328 int correctGzipBlockLength = 95; 329 String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false); 330 // We ignore the block checksum because createTestBlockStr can change the 331 // gzip header after the block is produced 332 assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4), 333 testBlockStr.substring(0, correctGzipBlockLength - 4)); 334 } 335 336 @Test 337 public void testReaderV2() throws IOException { 338 testReaderV2Internals(); 339 } 340 341 private void assertRelease(HFileBlock blk) { 342 if (blk instanceof ExclusiveMemHFileBlock) { 343 assertFalse(blk.release()); 344 } else { 345 assertTrue(blk.release()); 346 } 347 } 348 349 protected void testReaderV2Internals() throws IOException { 350 final Configuration conf = TEST_UTIL.getConfiguration(); 351 if (includesTag) { 352 conf.setInt("hfile.format.version", 3); 353 } 354 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 355 for (boolean pread : new boolean[] { false, true }) { 356 LOG.info("testReaderV2: Compression algorithm: " + algo + ", pread=" + pread); 357 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo); 358 FSDataOutputStream os = fs.create(path); 359 HFileContext meta = new HFileContextBuilder().withCompression(algo) 360 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) 361 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 362 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta); 363 long totalSize = 0; 364 for (int blockId = 0; blockId < 2; ++blockId) { 365 DataOutputStream dos = hbw.startWriting(BlockType.DATA); 366 for (int i = 0; i < 1234; ++i) 367 dos.writeInt(i); 368 hbw.writeHeaderAndData(os); 369 totalSize += hbw.getOnDiskSizeWithHeader(); 370 } 371 os.close(); 372 373 FSDataInputStream is = fs.open(path); 374 meta = 375 new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS) 376 .withIncludesTags(includesTag).withCompression(algo).build(); 377 ReaderContext context = 378 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 379 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 380 HFileBlock.FSReader hbr = 381 new HFileBlock.FSReaderImpl(context, meta, alloc, TEST_UTIL.getConfiguration()); 382 HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); 383 is.close(); 384 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 385 386 b.sanityCheck(); 387 assertEquals(4936, b.getUncompressedSizeWithoutHeader()); 388 assertEquals(algo == GZ ? 2173 : 4936, 389 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); 390 HFileBlock expected = b; 391 392 if (algo == GZ) { 393 is = fs.open(path); 394 ReaderContext readerContext = 395 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 396 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 397 hbr = 398 new HFileBlock.FSReaderImpl(readerContext, meta, alloc, TEST_UTIL.getConfiguration()); 399 b = hbr.readBlockData(0, 400 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true); 401 assertEquals(expected, b); 402 int wrongCompressedSize = 2172; 403 try { 404 hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread, 405 false, true); 406 fail("Exception expected"); 407 } catch (IOException ex) { 408 String expectedPrefix = "Passed in onDiskSizeWithHeader="; 409 assertTrue( 410 "Invalid exception message: '" + ex.getMessage() 411 + "'.\nMessage is expected to start with: '" + expectedPrefix + "'", 412 ex.getMessage().startsWith(expectedPrefix)); 413 } 414 assertRelease(b); 415 is.close(); 416 } 417 assertRelease(expected); 418 } 419 } 420 } 421 422 /** 423 * Test encoding/decoding data blocks. 424 * @throws IOException a bug or a problem with temporary files. 425 */ 426 @Test 427 public void testDataBlockEncoding() throws IOException { 428 testInternals(); 429 } 430 431 private void testInternals() throws IOException { 432 final int numBlocks = 5; 433 final Configuration conf = TEST_UTIL.getConfiguration(); 434 if (includesTag) { 435 conf.setInt("hfile.format.version", 3); 436 } 437 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 438 for (boolean pread : new boolean[] { false, true }) { 439 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 440 LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}", 441 algo.toString(), pread, encoding); 442 Path path = 443 new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo + "_" + encoding.toString()); 444 FSDataOutputStream os = fs.create(path); 445 HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) 446 ? new HFileDataBlockEncoderImpl(encoding) 447 : NoOpDataBlockEncoder.INSTANCE; 448 HFileContext meta = new HFileContextBuilder().withCompression(algo) 449 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) 450 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 451 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, dataBlockEncoder, meta); 452 long totalSize = 0; 453 final List<Integer> encodedSizes = new ArrayList<>(); 454 final List<ByteBuff> encodedBlocks = new ArrayList<>(); 455 for (int blockId = 0; blockId < numBlocks; ++blockId) { 456 hbw.startWriting(BlockType.DATA); 457 writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag); 458 hbw.writeHeaderAndData(os); 459 int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE; 460 ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader(); 461 final int encodedSize = encodedResultWithHeader.limit() - headerLen; 462 if (encoding != DataBlockEncoding.NONE) { 463 // We need to account for the two-byte encoding algorithm ID that 464 // comes after the 24-byte block header but before encoded KVs. 465 headerLen += DataBlockEncoding.ID_SIZE; 466 } 467 encodedSizes.add(encodedSize); 468 ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice(); 469 encodedBlocks.add(encodedBuf); 470 totalSize += hbw.getOnDiskSizeWithHeader(); 471 } 472 os.close(); 473 474 FSDataInputStream is = fs.open(path); 475 meta = new HFileContextBuilder().withHBaseCheckSum(true).withCompression(algo) 476 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag).build(); 477 ReaderContext context = 478 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 479 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 480 HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf); 481 hbr.setDataBlockEncoder(dataBlockEncoder, conf); 482 hbr.setIncludesMemStoreTS(includesMemstoreTS); 483 HFileBlock blockFromHFile, blockUnpacked; 484 int pos = 0; 485 for (int blockId = 0; blockId < numBlocks; ++blockId) { 486 blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true); 487 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 488 blockFromHFile.sanityCheck(); 489 pos += blockFromHFile.getOnDiskSizeWithHeader(); 490 assertEquals((int) encodedSizes.get(blockId), 491 blockFromHFile.getUncompressedSizeWithoutHeader()); 492 assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked()); 493 long packedHeapsize = blockFromHFile.heapSize(); 494 blockUnpacked = blockFromHFile.unpack(meta, hbr); 495 assertTrue(blockUnpacked.isUnpacked()); 496 if (meta.isCompressedOrEncrypted()) { 497 LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" 498 + blockUnpacked.heapSize()); 499 assertFalse(packedHeapsize == blockUnpacked.heapSize()); 500 assertTrue("Packed heapSize should be < unpacked heapSize", 501 packedHeapsize < blockUnpacked.heapSize()); 502 } 503 ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader(); 504 if (encoding != DataBlockEncoding.NONE) { 505 // We expect a two-byte big-endian encoding id. 506 assertEquals( 507 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread), 508 Long.toHexString(0), Long.toHexString(actualBuffer.get(0))); 509 assertEquals( 510 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread), 511 Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1))); 512 actualBuffer.position(2); 513 actualBuffer = actualBuffer.slice(); 514 } 515 516 ByteBuff expectedBuff = encodedBlocks.get(blockId); 517 expectedBuff.rewind(); 518 519 // test if content matches, produce nice message 520 assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread); 521 522 // test serialized blocks 523 for (boolean reuseBuffer : new boolean[] { false, true }) { 524 ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); 525 blockFromHFile.serialize(serialized, true); 526 HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer() 527 .deserialize(new SingleByteBuff(serialized), HEAP); 528 assertEquals("Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, 529 blockFromHFile, deserialized); 530 // intentional reference comparison 531 if (blockFromHFile != blockUnpacked) { 532 assertEquals("Deserialized block cannot be unpacked correctly.", blockUnpacked, 533 deserialized.unpack(meta, hbr)); 534 } 535 } 536 assertRelease(blockUnpacked); 537 if (blockFromHFile != blockUnpacked) { 538 blockFromHFile.release(); 539 } 540 } 541 is.close(); 542 } 543 } 544 } 545 } 546 547 static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding, 548 boolean pread) { 549 return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); 550 } 551 552 static void assertBuffersEqual(ByteBuff expectedBuffer, ByteBuff actualBuffer, 553 Compression.Algorithm compression, DataBlockEncoding encoding, boolean pread) { 554 if (!actualBuffer.equals(expectedBuffer)) { 555 int prefix = 0; 556 int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit()); 557 while (prefix < minLimit && expectedBuffer.get(prefix) == actualBuffer.get(prefix)) { 558 prefix++; 559 } 560 561 fail(String.format("Content mismatch for %s, commonPrefix %d, expected %s, got %s", 562 buildMessageDetails(compression, encoding, pread), prefix, 563 nextBytesToStr(expectedBuffer, prefix), nextBytesToStr(actualBuffer, prefix))); 564 } 565 } 566 567 /** 568 * Convert a few next bytes in the given buffer at the given position to string. Used for error 569 * messages. 570 */ 571 private static String nextBytesToStr(ByteBuff buf, int pos) { 572 int maxBytes = buf.limit() - pos; 573 int numBytes = Math.min(16, maxBytes); 574 return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, numBytes) 575 + (numBytes < maxBytes ? "..." : ""); 576 } 577 578 @Test 579 public void testPreviousOffset() throws IOException { 580 testPreviousOffsetInternals(); 581 } 582 583 protected void testPreviousOffsetInternals() throws IOException { 584 // TODO: parameterize these nested loops. 585 Configuration conf = TEST_UTIL.getConfiguration(); 586 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { 587 for (boolean pread : BOOLEAN_VALUES) { 588 for (boolean cacheOnWrite : BOOLEAN_VALUES) { 589 Random rand = defaultRandom(); 590 LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}", 591 algo.toString(), pread, cacheOnWrite); 592 Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset"); 593 List<Long> expectedOffsets = new ArrayList<>(); 594 List<Long> expectedPrevOffsets = new ArrayList<>(); 595 List<BlockType> expectedTypes = new ArrayList<>(); 596 List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null; 597 long totalSize = writeBlocks(TEST_UTIL.getConfiguration(), rand, algo, path, 598 expectedOffsets, expectedPrevOffsets, expectedTypes, expectedContents); 599 600 FSDataInputStream is = fs.open(path); 601 HFileContext meta = 602 new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS) 603 .withIncludesTags(includesTag).withCompression(algo).build(); 604 ReaderContext context = 605 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 606 .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build(); 607 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf); 608 long curOffset = 0; 609 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 610 if (!pread) { 611 assertEquals(is.getPos(), 612 curOffset + (i == 0 ? 0 : HConstants.HFILEBLOCK_HEADER_SIZE)); 613 } 614 615 assertEquals(expectedOffsets.get(i).longValue(), curOffset); 616 if (detailedLogging) { 617 LOG.info("Reading block #" + i + " at offset " + curOffset); 618 } 619 HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false); 620 if (detailedLogging) { 621 LOG.info("Block #" + i + ": " + b); 622 } 623 assertEquals("Invalid block #" + i + "'s type:", expectedTypes.get(i), 624 b.getBlockType()); 625 assertEquals("Invalid previous block offset for block " + i + " of " + "type " 626 + b.getBlockType() + ":", (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset()); 627 b.sanityCheck(); 628 assertEquals(curOffset, b.getOffset()); 629 630 // Now re-load this block knowing the on-disk size. This tests a 631 // different branch in the loader. 632 HFileBlock b2 = 633 hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false); 634 b2.sanityCheck(); 635 636 assertEquals(b.getBlockType(), b2.getBlockType()); 637 assertEquals(b.getOnDiskSizeWithoutHeader(), b2.getOnDiskSizeWithoutHeader()); 638 assertEquals(b.getOnDiskSizeWithHeader(), b2.getOnDiskSizeWithHeader()); 639 assertEquals(b.getUncompressedSizeWithoutHeader(), 640 b2.getUncompressedSizeWithoutHeader()); 641 assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset()); 642 assertEquals(curOffset, b2.getOffset()); 643 assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum()); 644 assertEquals(b.getOnDiskDataSizeWithHeader(), b2.getOnDiskDataSizeWithHeader()); 645 assertEquals(0, HFile.getAndResetChecksumFailuresCount()); 646 assertRelease(b2); 647 648 curOffset += b.getOnDiskSizeWithHeader(); 649 650 if (cacheOnWrite) { 651 // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply 652 // verifies that the unpacked value read back off disk matches the unpacked value 653 // generated before writing to disk. 654 HFileBlock newBlock = b.unpack(meta, hbr); 655 // neither b's unpacked nor the expectedContents have checksum. 656 // they should be identical 657 ByteBuff bufRead = newBlock.getBufferReadOnly(); 658 ByteBuffer bufExpected = expectedContents.get(i); 659 byte[] bytesRead = bufRead.toBytes(); 660 boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length, 661 bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0; 662 String wrongBytesMsg = ""; 663 664 if (!bytesAreCorrect) { 665 // Optimization: only construct an error message in case we 666 // will need it. 667 wrongBytesMsg = "Expected bytes in block #" + i + " (algo=" + algo + ", pread=" 668 + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n"; 669 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), 670 bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n" 671 + Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length)); 672 if (detailedLogging) { 673 LOG.warn( 674 "expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) 675 + "\nfound header" + HFileBlock.toStringHeader(bufRead)); 676 LOG.warn("bufread offset " + bufRead.arrayOffset() + " limit " + bufRead.limit() 677 + " expected offset " + bufExpected.arrayOffset() + " limit " 678 + bufExpected.limit()); 679 LOG.warn(wrongBytesMsg); 680 } 681 } 682 assertTrue(wrongBytesMsg, bytesAreCorrect); 683 assertRelease(newBlock); 684 if (newBlock != b) { 685 assertRelease(b); 686 } 687 } else { 688 assertRelease(b); 689 } 690 } 691 assertEquals(curOffset, fs.getFileStatus(path).getLen()); 692 is.close(); 693 } 694 } 695 } 696 } 697 698 private Random defaultRandom() { 699 return new Random(189237); 700 } 701 702 private class BlockReaderThread implements Callable<Boolean> { 703 private final String clientId; 704 private final HFileBlock.FSReader hbr; 705 private final List<Long> offsets; 706 private final List<BlockType> types; 707 private final long fileSize; 708 709 public BlockReaderThread(String clientId, HFileBlock.FSReader hbr, List<Long> offsets, 710 List<BlockType> types, long fileSize) { 711 this.clientId = clientId; 712 this.offsets = offsets; 713 this.hbr = hbr; 714 this.types = types; 715 this.fileSize = fileSize; 716 } 717 718 @Override 719 public Boolean call() throws Exception { 720 Random rand = new Random(clientId.hashCode()); 721 long endTime = EnvironmentEdgeManager.currentTime() + 10000; 722 int numBlocksRead = 0; 723 int numPositionalRead = 0; 724 int numWithOnDiskSize = 0; 725 while (EnvironmentEdgeManager.currentTime() < endTime) { 726 int blockId = rand.nextInt(NUM_TEST_BLOCKS); 727 long offset = offsets.get(blockId); 728 // now we only support concurrent read with pread = true 729 boolean pread = true; 730 boolean withOnDiskSize = rand.nextBoolean(); 731 long expectedSize = 732 (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset; 733 HFileBlock b = null; 734 try { 735 long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; 736 b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false); 737 if (useHeapAllocator) { 738 assertTrue(!b.isSharedMem()); 739 } else { 740 assertTrue(!b.getBlockType().isData() || b.isSharedMem()); 741 } 742 assertEquals(types.get(blockId), b.getBlockType()); 743 assertEquals(expectedSize, b.getOnDiskSizeWithHeader()); 744 assertEquals(offset, b.getOffset()); 745 } catch (IOException ex) { 746 LOG.error("Error in client " + clientId + " trying to read block at " + offset 747 + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize, ex); 748 return false; 749 } finally { 750 if (b != null) { 751 b.release(); 752 } 753 } 754 ++numBlocksRead; 755 if (pread) { 756 ++numPositionalRead; 757 } 758 759 if (withOnDiskSize) { 760 ++numWithOnDiskSize; 761 } 762 } 763 LOG 764 .info("Client " + clientId + " successfully read " + numBlocksRead + " blocks (with pread: " 765 + numPositionalRead + ", with onDiskSize " + "specified: " + numWithOnDiskSize + ")"); 766 767 return true; 768 } 769 } 770 771 @Test 772 public void testConcurrentReading() throws Exception { 773 testConcurrentReadingInternals(); 774 } 775 776 protected void testConcurrentReadingInternals() 777 throws IOException, InterruptedException, ExecutionException { 778 Configuration conf = TEST_UTIL.getConfiguration(); 779 for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) { 780 Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); 781 Random rand = defaultRandom(); 782 List<Long> offsets = new ArrayList<>(); 783 List<BlockType> types = new ArrayList<>(); 784 writeBlocks(TEST_UTIL.getConfiguration(), rand, compressAlgo, path, offsets, null, types, 785 null); 786 FSDataInputStream is = fs.open(path); 787 long fileSize = fs.getFileStatus(path).getLen(); 788 HFileContext meta = 789 new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS) 790 .withIncludesTags(includesTag).withCompression(compressAlgo).build(); 791 ReaderContext context = 792 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 793 .withFileSize(fileSize).withFilePath(path).withFileSystem(fs).build(); 794 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf); 795 796 Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS); 797 ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec); 798 799 for (int i = 0; i < NUM_READER_THREADS; ++i) { 800 ecs.submit( 801 new BlockReaderThread("reader_" + (char) ('A' + i), hbr, offsets, types, fileSize)); 802 } 803 804 for (int i = 0; i < NUM_READER_THREADS; ++i) { 805 Future<Boolean> result = ecs.take(); 806 assertTrue(result.get()); 807 if (detailedLogging) { 808 LOG.info(String.valueOf(i + 1) + " reader threads finished successfully (algo=" 809 + compressAlgo + ")"); 810 } 811 } 812 is.close(); 813 } 814 } 815 816 private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo, 817 Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets, 818 List<BlockType> expectedTypes, List<ByteBuffer> expectedContents) throws IOException { 819 boolean cacheOnWrite = expectedContents != null; 820 FSDataOutputStream os = fs.create(path); 821 HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(true) 822 .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) 823 .withCompression(compressAlgo).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build(); 824 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta); 825 Map<BlockType, Long> prevOffsetByType = new HashMap<>(); 826 long totalSize = 0; 827 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 828 long pos = os.getPos(); 829 int blockTypeOrdinal = rand.nextInt(BlockType.values().length); 830 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { 831 blockTypeOrdinal = BlockType.DATA.ordinal(); 832 } 833 BlockType bt = BlockType.values()[blockTypeOrdinal]; 834 DataOutputStream dos = hbw.startWriting(bt); 835 int size = rand.nextInt(500); 836 for (int j = 0; j < size; ++j) { 837 // This might compress well. 838 dos.writeShort(i + 1); 839 dos.writeInt(j + 1); 840 } 841 842 if (expectedOffsets != null) expectedOffsets.add(os.getPos()); 843 844 if (expectedPrevOffsets != null) { 845 Long prevOffset = prevOffsetByType.get(bt); 846 expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1); 847 prevOffsetByType.put(bt, os.getPos()); 848 } 849 850 expectedTypes.add(bt); 851 852 hbw.writeHeaderAndData(os); 853 totalSize += hbw.getOnDiskSizeWithHeader(); 854 855 if (cacheOnWrite) { 856 ByteBuff buff = hbw.cloneUncompressedBufferWithHeader(); 857 expectedContents.add(buff.asSubByteBuffer(buff.capacity())); 858 } 859 860 if (detailedLogging) { 861 LOG.info("Written block #" + i + " of type " + bt + ", uncompressed size " 862 + hbw.getUncompressedSizeWithoutHeader() + ", packed size " 863 + hbw.getOnDiskSizeWithoutHeader() + " at offset " + pos); 864 } 865 } 866 os.close(); 867 LOG.info("Created a temporary file at " + path + ", " + fs.getFileStatus(path).getLen() 868 + " byte, compression=" + compressAlgo); 869 return totalSize; 870 } 871 872 @Test 873 public void testBlockHeapSize() { 874 testBlockHeapSizeInternals(); 875 } 876 877 protected void testBlockHeapSizeInternals() { 878 if (ClassSize.is32BitJVM()) { 879 assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 880 } else { 881 assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); 882 } 883 884 for (int size : new int[] { 100, 256, 12345 }) { 885 byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size]; 886 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 887 HFileContext meta = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS) 888 .withIncludesTags(includesTag).withHBaseCheckSum(false).withCompression(Algorithm.NONE) 889 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withChecksumType(ChecksumType.NULL) 890 .build(); 891 HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), 892 HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP); 893 long byteBufferExpectedSize = 894 ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true) 895 + HConstants.HFILEBLOCK_HEADER_SIZE + size); 896 long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true)); 897 long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true)); 898 long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize; 899 assertEquals( 900 "Block data size: " + size + ", byte buffer expected " + "size: " + byteBufferExpectedSize 901 + ", HFileBlock class expected " + "size: " + hfileBlockExpectedSize 902 + " HFileContext class expected size: " + hfileMetaSize + "; ", 903 expected, block.heapSize()); 904 } 905 } 906 907 @Test 908 public void testSerializeWithoutNextBlockMetadata() { 909 int size = 100; 910 int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; 911 byte[] byteArr = new byte[length]; 912 ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); 913 HFileContext meta = new HFileContextBuilder().build(); 914 HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 915 ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc); 916 HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, 917 ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc); 918 ByteBuffer buff1 = ByteBuffer.allocate(length); 919 ByteBuffer buff2 = ByteBuffer.allocate(length); 920 blockWithNextBlockMetadata.serialize(buff1, true); 921 blockWithoutNextBlockMetadata.serialize(buff2, true); 922 assertNotEquals(buff1, buff2); 923 buff1.clear(); 924 buff2.clear(); 925 blockWithNextBlockMetadata.serialize(buff1, false); 926 blockWithoutNextBlockMetadata.serialize(buff2, false); 927 assertEquals(buff1, buff2); 928 } 929}