001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.allOf; 022import static org.hamcrest.Matchers.hasProperty; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.hamcrest.Matchers.startsWith; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.ByteArrayOutputStream; 030import java.io.Closeable; 031import java.io.IOException; 032import java.io.PrintStream; 033import java.nio.ByteBuffer; 034import java.nio.channels.SeekableByteChannel; 035import java.nio.charset.StandardCharsets; 036import java.nio.file.FileSystems; 037import java.nio.file.Files; 038import java.nio.file.StandardOpenOption; 039import java.time.Instant; 040import java.util.LinkedList; 041import java.util.List; 042import java.util.NoSuchElementException; 043import java.util.Random; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellBuilderType; 048import org.apache.hadoop.hbase.ExtendedCell; 049import org.apache.hadoop.hbase.ExtendedCellBuilder; 050import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseTestingUtil; 053import org.apache.hadoop.hbase.fs.HFileSystem; 054import org.apache.hadoop.hbase.nio.ByteBuff; 055import org.apache.hadoop.hbase.testclassification.IOTests; 056import org.apache.hadoop.hbase.testclassification.SmallTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.hamcrest.Description; 059import org.hamcrest.Matcher; 060import org.hamcrest.TypeSafeMatcher; 061import org.junit.ClassRule; 062import org.junit.Rule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.junit.rules.ExternalResource; 066import org.junit.rules.RuleChain; 067import org.junit.rules.TestName; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * This test provides coverage for HFileHeader block fields that are read and interpreted before 073 * HBase checksum validation can be applied. As of now, this is just 074 * {@code onDiskSizeWithoutHeader}. 075 */ 076@Category({ IOTests.class, SmallTests.class }) 077public class TestHFileBlockHeaderCorruption { 078 079 private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlockHeaderCorruption.class); 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestHFileBlockHeaderCorruption.class); 084 085 private final HFileTestRule hFileTestRule; 086 087 @Rule 088 public final RuleChain ruleChain; 089 090 public TestHFileBlockHeaderCorruption() throws IOException { 091 TestName testName = new TestName(); 092 hFileTestRule = new HFileTestRule(new HBaseTestingUtil(), testName); 093 ruleChain = RuleChain.outerRule(testName).around(hFileTestRule); 094 } 095 096 @Test 097 public void testOnDiskSizeWithoutHeaderCorruptionFirstBlock() throws Exception { 098 HFileBlockChannelPosition firstBlock = null; 099 try { 100 try (HFileBlockChannelPositionIterator it = 101 new HFileBlockChannelPositionIterator(hFileTestRule)) { 102 assertTrue(it.hasNext()); 103 firstBlock = it.next(); 104 } 105 106 Corrupter c = new Corrupter(firstBlock); 107 108 logHeader(firstBlock); 109 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 110 ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE))); 111 logHeader(firstBlock); 112 try (HFileBlockChannelPositionIterator it = 113 new HFileBlockChannelPositionIterator(hFileTestRule)) { 114 CountingConsumer consumer = new CountingConsumer(it); 115 try { 116 consumer.readFully(); 117 fail(); 118 } catch (Exception e) { 119 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 120 .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); 121 } 122 assertEquals(0, consumer.getItemsRead()); 123 } 124 125 c.restore(); 126 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 127 ByteBuffer.wrap(Bytes.toBytes(0))); 128 logHeader(firstBlock); 129 try (HFileBlockChannelPositionIterator it = 130 new HFileBlockChannelPositionIterator(hFileTestRule)) { 131 CountingConsumer consumer = new CountingConsumer(it); 132 try { 133 consumer.readFully(); 134 fail(); 135 } catch (Exception e) { 136 assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class)); 137 } 138 assertEquals(0, consumer.getItemsRead()); 139 } 140 141 c.restore(); 142 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 143 ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE))); 144 logHeader(firstBlock); 145 try (HFileBlockChannelPositionIterator it = 146 new HFileBlockChannelPositionIterator(hFileTestRule)) { 147 CountingConsumer consumer = new CountingConsumer(it); 148 try { 149 consumer.readFully(); 150 fail(); 151 } catch (Exception e) { 152 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 153 .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); 154 } 155 assertEquals(0, consumer.getItemsRead()); 156 } 157 } finally { 158 if (firstBlock != null) { 159 firstBlock.close(); 160 } 161 } 162 } 163 164 @Test 165 public void testOnDiskSizeWithoutHeaderCorruptionSecondBlock() throws Exception { 166 HFileBlockChannelPosition secondBlock = null; 167 try { 168 try (HFileBlockChannelPositionIterator it = 169 new HFileBlockChannelPositionIterator(hFileTestRule)) { 170 assertTrue(it.hasNext()); 171 it.next(); 172 assertTrue(it.hasNext()); 173 secondBlock = it.next(); 174 } 175 176 Corrupter c = new Corrupter(secondBlock); 177 178 logHeader(secondBlock); 179 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 180 ByteBuffer.wrap(Bytes.toBytes(Integer.MIN_VALUE))); 181 logHeader(secondBlock); 182 try (HFileBlockChannelPositionIterator it = 183 new HFileBlockChannelPositionIterator(hFileTestRule)) { 184 CountingConsumer consumer = new CountingConsumer(it); 185 try { 186 consumer.readFully(); 187 fail(); 188 } catch (Exception e) { 189 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 190 .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); 191 } 192 assertEquals(1, consumer.getItemsRead()); 193 } 194 195 c.restore(); 196 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 197 ByteBuffer.wrap(Bytes.toBytes(0))); 198 logHeader(secondBlock); 199 try (HFileBlockChannelPositionIterator it = 200 new HFileBlockChannelPositionIterator(hFileTestRule)) { 201 CountingConsumer consumer = new CountingConsumer(it); 202 try { 203 consumer.readFully(); 204 fail(); 205 } catch (Exception e) { 206 assertThat(e, new IsThrowableMatching().withInstanceOf(IllegalArgumentException.class)); 207 } 208 assertEquals(1, consumer.getItemsRead()); 209 } 210 211 c.restore(); 212 c.write(HFileBlock.Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX, 213 ByteBuffer.wrap(Bytes.toBytes(Integer.MAX_VALUE))); 214 logHeader(secondBlock); 215 try (HFileBlockChannelPositionIterator it = 216 new HFileBlockChannelPositionIterator(hFileTestRule)) { 217 CountingConsumer consumer = new CountingConsumer(it); 218 try { 219 consumer.readFully(); 220 fail(); 221 } catch (Exception e) { 222 assertThat(e, new IsThrowableMatching().withInstanceOf(IOException.class) 223 .withMessage(startsWith("Invalid onDiskSizeWithHeader="))); 224 } 225 assertEquals(1, consumer.getItemsRead()); 226 } 227 } finally { 228 if (secondBlock != null) { 229 secondBlock.close(); 230 } 231 } 232 } 233 234 private static void logHeader(HFileBlockChannelPosition hbcp) throws IOException { 235 ByteBuff buf = ByteBuff.wrap(ByteBuffer.allocate(HFileBlock.headerSize(true))); 236 hbcp.rewind(); 237 assertEquals(buf.capacity(), buf.read(hbcp.getChannel())); 238 buf.rewind(); 239 hbcp.rewind(); 240 logHeader(buf); 241 } 242 243 private static void logHeader(ByteBuff buf) { 244 byte[] blockMagic = new byte[8]; 245 buf.get(blockMagic); 246 int onDiskSizeWithoutHeader = buf.getInt(); 247 int uncompressedSizeWithoutHeader = buf.getInt(); 248 long prevBlockOffset = buf.getLong(); 249 byte checksumType = buf.get(); 250 int bytesPerChecksum = buf.getInt(); 251 int onDiskDataSizeWithHeader = buf.getInt(); 252 LOG.debug( 253 "blockMagic={}, onDiskSizeWithoutHeader={}, uncompressedSizeWithoutHeader={}, " 254 + "prevBlockOffset={}, checksumType={}, bytesPerChecksum={}, onDiskDataSizeWithHeader={}", 255 Bytes.toStringBinary(blockMagic), onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 256 prevBlockOffset, checksumType, bytesPerChecksum, onDiskDataSizeWithHeader); 257 } 258 259 /** 260 * Data class to enabled messing with the bytes behind an {@link HFileBlock}. 261 */ 262 public static class HFileBlockChannelPosition implements Closeable { 263 private final SeekableByteChannel channel; 264 private final long position; 265 266 public HFileBlockChannelPosition(SeekableByteChannel channel, long position) { 267 this.channel = channel; 268 this.position = position; 269 } 270 271 public SeekableByteChannel getChannel() { 272 return channel; 273 } 274 275 public long getPosition() { 276 return position; 277 } 278 279 public void rewind() throws IOException { 280 channel.position(position); 281 } 282 283 @Override 284 public void close() throws IOException { 285 channel.close(); 286 } 287 } 288 289 /** 290 * Reads blocks off of an {@link HFileBlockChannelPositionIterator}, counting them as it does. 291 */ 292 public static class CountingConsumer { 293 private final HFileBlockChannelPositionIterator iterator; 294 private int itemsRead = 0; 295 296 public CountingConsumer(HFileBlockChannelPositionIterator iterator) { 297 this.iterator = iterator; 298 } 299 300 public int getItemsRead() { 301 return itemsRead; 302 } 303 304 public Object readFully() throws IOException { 305 Object val = null; 306 for (itemsRead = 0; iterator.hasNext(); itemsRead++) { 307 val = iterator.next(); 308 } 309 return val; 310 } 311 } 312 313 /** 314 * A simplified wrapper over an {@link HFileBlock.BlockIterator} that looks a lot like an 315 * {@link java.util.Iterator}. 316 */ 317 public static class HFileBlockChannelPositionIterator implements Closeable { 318 319 private final HFileTestRule hFileTestRule; 320 private final HFile.Reader reader; 321 private final HFileBlock.BlockIterator iter; 322 private HFileBlockChannelPosition current = null; 323 324 public HFileBlockChannelPositionIterator(HFileTestRule hFileTestRule) throws IOException { 325 Configuration conf = hFileTestRule.getConfiguration(); 326 HFileSystem hfs = hFileTestRule.getHFileSystem(); 327 Path hfsPath = hFileTestRule.getPath(); 328 329 HFile.Reader reader = null; 330 HFileBlock.BlockIterator iter = null; 331 try { 332 reader = HFile.createReader(hfs, hfsPath, CacheConfig.DISABLED, true, conf); 333 HFileBlock.FSReader fsreader = reader.getUncachedBlockReader(); 334 iter = fsreader.blockRange(0, hfs.getFileStatus(hfsPath).getLen()); 335 } catch (IOException e) { 336 if (reader != null) { 337 closeQuietly(reader::close); 338 } 339 throw e; 340 } 341 342 this.hFileTestRule = hFileTestRule; 343 this.reader = reader; 344 this.iter = iter; 345 } 346 347 public boolean hasNext() throws IOException { 348 HFileBlock next = iter.nextBlock(); 349 SeekableByteChannel channel = hFileTestRule.getRWChannel(); 350 if (next != null) { 351 current = new HFileBlockChannelPosition(channel, next.getOffset()); 352 } 353 return next != null; 354 } 355 356 public HFileBlockChannelPosition next() { 357 if (current == null) { 358 throw new NoSuchElementException(); 359 } 360 HFileBlockChannelPosition ret = current; 361 current = null; 362 return ret; 363 } 364 365 @Override 366 public void close() throws IOException { 367 if (current != null) { 368 closeQuietly(current::close); 369 } 370 closeQuietly(reader::close); 371 } 372 373 @FunctionalInterface 374 private interface CloseMethod { 375 void run() throws IOException; 376 } 377 378 private static void closeQuietly(CloseMethod closeMethod) { 379 try { 380 closeMethod.run(); 381 } catch (Throwable e) { 382 LOG.debug("Ignoring thrown exception.", e); 383 } 384 } 385 } 386 387 /** 388 * Enables writing and rewriting portions of the file backing an {@link HFileBlock}. 389 */ 390 public static class Corrupter { 391 392 private final HFileBlockChannelPosition channelAndPosition; 393 private final ByteBuffer originalHeader; 394 395 public Corrupter(HFileBlockChannelPosition channelAndPosition) throws IOException { 396 this.channelAndPosition = channelAndPosition; 397 this.originalHeader = readHeaderData(channelAndPosition); 398 } 399 400 private static ByteBuffer readHeaderData(HFileBlockChannelPosition channelAndPosition) 401 throws IOException { 402 SeekableByteChannel channel = channelAndPosition.getChannel(); 403 ByteBuffer originalHeader = ByteBuffer.allocate(HFileBlock.headerSize(true)); 404 channelAndPosition.rewind(); 405 channel.read(originalHeader); 406 return originalHeader; 407 } 408 409 public void write(int offset, ByteBuffer src) throws IOException { 410 SeekableByteChannel channel = channelAndPosition.getChannel(); 411 long position = channelAndPosition.getPosition(); 412 channel.position(position + offset); 413 channel.write(src); 414 } 415 416 public void restore() throws IOException { 417 SeekableByteChannel channel = channelAndPosition.getChannel(); 418 originalHeader.rewind(); 419 channelAndPosition.rewind(); 420 assertEquals(originalHeader.capacity(), channel.write(originalHeader)); 421 } 422 } 423 424 public static class HFileTestRule extends ExternalResource { 425 426 private final HBaseTestingUtil testingUtility; 427 private final HFileSystem hfs; 428 private final HFileContext context; 429 private final TestName testName; 430 private Path path; 431 432 public HFileTestRule(HBaseTestingUtil testingUtility, TestName testName) throws IOException { 433 this.testingUtility = testingUtility; 434 this.testName = testName; 435 this.hfs = (HFileSystem) HFileSystem.get(testingUtility.getConfiguration()); 436 this.context = 437 new HFileContextBuilder().withBlockSize(4 * 1024).withHBaseCheckSum(true).build(); 438 } 439 440 public Configuration getConfiguration() { 441 return testingUtility.getConfiguration(); 442 } 443 444 public HFileSystem getHFileSystem() { 445 return hfs; 446 } 447 448 public HFileContext getHFileContext() { 449 return context; 450 } 451 452 public Path getPath() { 453 return path; 454 } 455 456 public SeekableByteChannel getRWChannel() throws IOException { 457 java.nio.file.Path p = FileSystems.getDefault().getPath(path.toString()); 458 return Files.newByteChannel(p, StandardOpenOption.READ, StandardOpenOption.WRITE, 459 StandardOpenOption.DSYNC); 460 } 461 462 @Override 463 protected void before() throws Throwable { 464 this.path = new Path(testingUtility.getDataTestDirOnTestFS(), testName.getMethodName()); 465 HFile.WriterFactory factory = 466 HFile.getWriterFactory(testingUtility.getConfiguration(), CacheConfig.DISABLED) 467 .withPath(hfs, path).withFileContext(context); 468 469 ExtendedCellBuilder cellBuilder = 470 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); 471 Random rand = new Random(Instant.now().toEpochMilli()); 472 byte[] family = Bytes.toBytes("f"); 473 try (HFile.Writer writer = factory.create()) { 474 for (int i = 0; i < 40; i++) { 475 byte[] row = RandomKeyValueUtil.randomOrderedFixedLengthKey(rand, i, 100); 476 byte[] qualifier = RandomKeyValueUtil.randomRowOrQualifier(rand); 477 byte[] value = RandomKeyValueUtil.randomValue(rand); 478 ExtendedCell cell = cellBuilder.setType(Cell.Type.Put).setRow(row).setFamily(family) 479 .setQualifier(qualifier).setValue(value).build(); 480 writer.append(cell); 481 cellBuilder.clear(); 482 } 483 } 484 } 485 } 486 487 /** 488 * A Matcher implementation that can make basic assertions over a provided {@link Throwable}. 489 * Assertion failures include the full stacktrace in their description. 490 */ 491 private static final class IsThrowableMatching extends TypeSafeMatcher<Throwable> { 492 493 private final List<Matcher<? super Throwable>> requirements = new LinkedList<>(); 494 495 public IsThrowableMatching withInstanceOf(Class<?> type) { 496 requirements.add(instanceOf(type)); 497 return this; 498 } 499 500 public IsThrowableMatching withMessage(Matcher<String> matcher) { 501 requirements.add(hasProperty("message", matcher)); 502 return this; 503 } 504 505 @Override 506 protected boolean matchesSafely(Throwable throwable) { 507 return allOf(requirements).matches(throwable); 508 } 509 510 @Override 511 protected void describeMismatchSafely(Throwable item, Description mismatchDescription) { 512 allOf(requirements).describeMismatch(item, mismatchDescription); 513 // would be nice if `item` could be provided as the cause of the AssertionError instead. 514 mismatchDescription.appendText(String.format("%nProvided: ")); 515 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { 516 try (PrintStream ps = new PrintStream(baos, false, StandardCharsets.UTF_8.name())) { 517 item.printStackTrace(ps); 518 ps.flush(); 519 } 520 mismatchDescription.appendText(baos.toString(StandardCharsets.UTF_8.name())); 521 } catch (Exception e) { 522 throw new RuntimeException(e); 523 } 524 } 525 526 @Override 527 public void describeTo(Description description) { 528 description.appendDescriptionOf(allOf(requirements)); 529 } 530 } 531}