001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.hamcrest.MatcherAssert.assertThat; 026import static org.hamcrest.Matchers.allOf; 027import static org.hamcrest.Matchers.hasItem; 028import static org.hamcrest.Matchers.hasItems; 029import static org.junit.Assert.assertArrayEquals; 030import static org.junit.Assert.assertEquals; 031import static org.junit.Assert.assertFalse; 032import static org.junit.Assert.assertTrue; 033import static org.junit.Assert.fail; 034import static org.junit.Assume.assumeTrue; 035import static org.mockito.ArgumentMatchers.anyString; 036import static org.mockito.Mockito.mock; 037import static org.mockito.Mockito.verify; 038import static org.mockito.Mockito.verifyNoMoreInteractions; 039import static org.mockito.Mockito.when; 040 041import io.opentelemetry.api.trace.Span; 042import io.opentelemetry.api.trace.StatusCode; 043import io.opentelemetry.context.Scope; 044import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 045import io.opentelemetry.sdk.trace.data.SpanData; 046import java.io.DataOutputStream; 047import java.io.IOException; 048import java.io.InputStream; 049import java.nio.ByteBuffer; 050import java.util.Random; 051import java.util.concurrent.TimeUnit; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.FSDataInputStream; 054import org.apache.hadoop.fs.FSDataOutputStream; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.hbase.HBaseClassTestRule; 058import org.apache.hadoop.hbase.HBaseTestingUtil; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.MatcherPredicate; 061import org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers; 062import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers; 063import org.apache.hadoop.hbase.fs.HFileSystem; 064import org.apache.hadoop.hbase.io.ByteBuffAllocator; 065import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 066import org.apache.hadoop.hbase.io.compress.Compression; 067import org.apache.hadoop.hbase.io.util.BlockIOUtils; 068import org.apache.hadoop.hbase.nio.ByteBuff; 069import org.apache.hadoop.hbase.nio.MultiByteBuff; 070import org.apache.hadoop.hbase.nio.SingleByteBuff; 071import org.apache.hadoop.hbase.testclassification.IOTests; 072import org.apache.hadoop.hbase.testclassification.SmallTests; 073import org.apache.hadoop.hbase.trace.TraceUtil; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.junit.ClassRule; 077import org.junit.Rule; 078import org.junit.Test; 079import org.junit.experimental.categories.Category; 080import org.junit.rules.ExpectedException; 081import org.junit.rules.TestName; 082 083@Category({ IOTests.class, SmallTests.class }) 084public class TestBlockIOUtils { 085 086 @ClassRule 087 public static final HBaseClassTestRule CLASS_RULE = 088 HBaseClassTestRule.forClass(TestBlockIOUtils.class); 089 090 @Rule 091 public TestName testName = new TestName(); 092 093 @Rule 094 public ExpectedException exception = ExpectedException.none(); 095 096 @Rule 097 public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); 098 099 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 100 101 private static final int NUM_TEST_BLOCKS = 2; 102 private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ; 103 104 @Test 105 public void testIsByteBufferReadable() throws IOException { 106 FileSystem fs = TEST_UTIL.getTestFileSystem(); 107 Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testIsByteBufferReadable"); 108 try (FSDataOutputStream out = fs.create(p)) { 109 out.writeInt(23); 110 } 111 try (FSDataInputStream is = fs.open(p)) { 112 assertFalse(BlockIOUtils.isByteBufferReadable(is)); 113 } 114 } 115 116 @Test 117 public void testReadFully() throws IOException { 118 TraceUtil.trace(() -> { 119 FileSystem fs = TEST_UTIL.getTestFileSystem(); 120 Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully"); 121 String s = "hello world"; 122 try (FSDataOutputStream out = fs.create(p)) { 123 out.writeBytes(s); 124 } 125 ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11)); 126 try (FSDataInputStream in = fs.open(p)) { 127 BlockIOUtils.readFully(buf, in, 11); 128 } 129 buf.rewind(); 130 byte[] heapBuf = new byte[s.length()]; 131 buf.get(heapBuf, 0, heapBuf.length); 132 assertArrayEquals(Bytes.toBytes(s), heapBuf); 133 }, testName.getMethodName()); 134 135 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 136 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 137 assertThat(otelRule.getSpans(), 138 hasItems(allOf(hasName(testName.getMethodName()), 139 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readFully"), 140 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11)))))))); 141 } 142 143 @Test 144 public void testPreadWithReadFullBytes() throws IOException { 145 testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime()); 146 } 147 148 @Test 149 public void testPreadWithoutReadFullBytes() throws IOException { 150 testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime()); 151 } 152 153 private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed) 154 throws IOException { 155 Configuration conf = TEST_UTIL.getConfiguration(); 156 conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes); 157 FileSystem fs = TEST_UTIL.getTestFileSystem(); 158 Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName()); 159 // give a fixed seed such we can see failure easily. 160 Random rand = new Random(randomSeed); 161 long totalDataBlockBytes = 162 writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path); 163 readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes); 164 } 165 166 private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo, 167 Path path) throws IOException { 168 FileSystem fs = HFileSystem.get(conf); 169 FSDataOutputStream os = fs.create(path); 170 HFileContext meta = 171 new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build(); 172 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta); 173 long totalDataBlockBytes = 0; 174 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 175 int blockTypeOrdinal = rand.nextInt(BlockType.values().length); 176 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { 177 blockTypeOrdinal = BlockType.DATA.ordinal(); 178 } 179 BlockType bt = BlockType.values()[blockTypeOrdinal]; 180 DataOutputStream dos = hbw.startWriting(bt); 181 int size = rand.nextInt(500); 182 for (int j = 0; j < size; ++j) { 183 dos.writeShort(i + 1); 184 dos.writeInt(j + 1); 185 } 186 187 hbw.writeHeaderAndData(os); 188 totalDataBlockBytes += hbw.getOnDiskSizeWithHeader(); 189 } 190 // append a dummy trailer and in a actual HFile it should have more data. 191 FixedFileTrailer trailer = new FixedFileTrailer(3, 3); 192 trailer.setFirstDataBlockOffset(0); 193 trailer.setLastDataBlockOffset(totalDataBlockBytes); 194 trailer.setComparatorClass(meta.getCellComparator().getClass()); 195 trailer.setDataIndexCount(NUM_TEST_BLOCKS); 196 trailer.setCompressionCodec(compressAlgo); 197 trailer.serialize(os); 198 // close the stream 199 os.close(); 200 return totalDataBlockBytes; 201 } 202 203 private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo, 204 long totalDataBlockBytes) throws IOException { 205 FSDataInputStream is = fs.open(path); 206 HFileContext fileContext = 207 new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build(); 208 ReaderContext context = 209 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 210 .withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes) 211 .withFilePath(path).withFileSystem(fs).build(); 212 HFileBlock.FSReader hbr = 213 new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf()); 214 215 long onDiskSizeOfNextBlock = -1; 216 long offset = 0; 217 int numOfReadBlock = 0; 218 // offset and totalBytes shares the same logic in the HFilePreadReader 219 while (offset < totalDataBlockBytes) { 220 HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false); 221 numOfReadBlock++; 222 try { 223 onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); 224 offset += block.getOnDiskSizeWithHeader(); 225 } finally { 226 block.release(); 227 } 228 } 229 assertEquals(totalDataBlockBytes, offset); 230 assertEquals(NUM_TEST_BLOCKS, numOfReadBlock); 231 deleteFile(fs, path); 232 } 233 234 private void deleteFile(FileSystem fs, Path path) throws IOException { 235 if (fs.exists(path)) { 236 fs.delete(path, true); 237 } 238 } 239 240 @Test 241 public void testReadWithExtra() throws IOException { 242 FileSystem fs = TEST_UTIL.getTestFileSystem(); 243 Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadWithExtra"); 244 String s = "hello world"; 245 try (FSDataOutputStream out = fs.create(p)) { 246 out.writeBytes(s); 247 } 248 249 Span span = TraceUtil.createSpan(testName.getMethodName()); 250 try (Scope ignored = span.makeCurrent()) { 251 ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8)); 252 try (FSDataInputStream in = fs.open(p)) { 253 assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2)); 254 } 255 buf.rewind(); 256 byte[] heapBuf = new byte[buf.capacity()]; 257 buf.get(heapBuf, 0, heapBuf.length); 258 assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf); 259 } finally { 260 span.end(); 261 } 262 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 263 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 264 assertThat(otelRule.getSpans(), 265 hasItems(allOf(hasName(testName.getMethodName()), 266 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), 267 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 8L)))))))); 268 269 otelRule.clearSpans(); 270 span = TraceUtil.createSpan(testName.getMethodName()); 271 try (Scope ignored = span.makeCurrent()) { 272 ByteBuff buf = 273 new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); 274 try (FSDataInputStream in = fs.open(p)) { 275 assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3)); 276 } 277 buf.rewind(); 278 byte[] heapBuf = new byte[11]; 279 buf.get(heapBuf, 0, heapBuf.length); 280 assertArrayEquals(Bytes.toBytes("hello world"), heapBuf); 281 } finally { 282 span.end(); 283 } 284 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 285 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 286 assertThat(otelRule.getSpans(), 287 hasItems(allOf(hasName(testName.getMethodName()), 288 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), 289 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L)))))))); 290 291 otelRule.clearSpans(); 292 span = TraceUtil.createSpan(testName.getMethodName()); 293 try (Scope ignored = span.makeCurrent()) { 294 ByteBuff buf = 295 new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); 296 buf.position(0).limit(12); 297 exception.expect(IOException.class); 298 try (FSDataInputStream in = fs.open(p)) { 299 BlockIOUtils.readWithExtra(buf, in, 12, 0); 300 fail("Should only read 11 bytes"); 301 } 302 } finally { 303 span.end(); 304 } 305 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 306 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 307 assertThat(otelRule.getSpans(), 308 hasItems(allOf(hasName(testName.getMethodName()), 309 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), 310 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L)))))))); 311 } 312 313 @Test 314 public void testPositionalReadNoExtra() throws IOException { 315 long position = 0; 316 int bufOffset = 0; 317 int necessaryLen = 10; 318 int extraLen = 0; 319 int totalLen = necessaryLen + extraLen; 320 byte[] buf = new byte[totalLen]; 321 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 322 FSDataInputStream in = mock(FSDataInputStream.class); 323 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); 324 when(in.hasCapability(anyString())).thenReturn(false); 325 boolean ret = 326 TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), 327 testName.getMethodName()); 328 assertFalse("Expect false return when no extra bytes requested", ret); 329 verify(in).read(position, buf, bufOffset, totalLen); 330 verify(in).hasCapability(anyString()); 331 verifyNoMoreInteractions(in); 332 333 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 334 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 335 assertThat(otelRule.getSpans(), 336 hasItems(allOf(hasName(testName.getMethodName()), 337 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 338 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); 339 } 340 341 @Test 342 public void testPositionalReadShortReadOfNecessaryBytes() throws IOException { 343 long position = 0; 344 int bufOffset = 0; 345 int necessaryLen = 10; 346 int extraLen = 0; 347 int totalLen = necessaryLen + extraLen; 348 byte[] buf = new byte[totalLen]; 349 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 350 FSDataInputStream in = mock(FSDataInputStream.class); 351 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); 352 when(in.read(5, buf, 5, 5)).thenReturn(5); 353 when(in.hasCapability(anyString())).thenReturn(false); 354 boolean ret = 355 TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), 356 testName.getMethodName()); 357 assertFalse("Expect false return when no extra bytes requested", ret); 358 verify(in).read(position, buf, bufOffset, totalLen); 359 verify(in).read(5, buf, 5, 5); 360 verify(in).hasCapability(anyString()); 361 verifyNoMoreInteractions(in); 362 363 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 364 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 365 assertThat(otelRule.getSpans(), 366 hasItems(allOf(hasName(testName.getMethodName()), 367 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 368 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); 369 } 370 371 @Test 372 public void testPositionalReadExtraSucceeded() throws IOException { 373 long position = 0; 374 int bufOffset = 0; 375 int necessaryLen = 10; 376 int extraLen = 5; 377 int totalLen = necessaryLen + extraLen; 378 byte[] buf = new byte[totalLen]; 379 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 380 FSDataInputStream in = mock(FSDataInputStream.class); 381 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); 382 when(in.hasCapability(anyString())).thenReturn(false); 383 boolean ret = 384 TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), 385 testName.getMethodName()); 386 assertTrue("Expect true return when reading extra bytes succeeds", ret); 387 verify(in).read(position, buf, bufOffset, totalLen); 388 verify(in).hasCapability(anyString()); 389 verifyNoMoreInteractions(in); 390 391 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 392 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 393 assertThat(otelRule.getSpans(), 394 hasItems(allOf(hasName(testName.getMethodName()), 395 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 396 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); 397 } 398 399 @Test 400 public void testPositionalReadExtraFailed() throws IOException { 401 long position = 0; 402 int bufOffset = 0; 403 int necessaryLen = 10; 404 int extraLen = 5; 405 int totalLen = necessaryLen + extraLen; 406 byte[] buf = new byte[totalLen]; 407 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 408 FSDataInputStream in = mock(FSDataInputStream.class); 409 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen); 410 when(in.hasCapability(anyString())).thenReturn(false); 411 boolean ret = 412 TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), 413 testName.getMethodName()); 414 assertFalse("Expect false return when reading extra bytes fails", ret); 415 verify(in).read(position, buf, bufOffset, totalLen); 416 verify(in).hasCapability(anyString()); 417 verifyNoMoreInteractions(in); 418 419 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 420 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 421 assertThat(otelRule.getSpans(), 422 hasItems(allOf(hasName(testName.getMethodName()), 423 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 424 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", necessaryLen)))))))); 425 } 426 427 @Test 428 public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes() throws IOException { 429 long position = 0; 430 int bufOffset = 0; 431 int necessaryLen = 10; 432 int extraLen = 5; 433 int totalLen = necessaryLen + extraLen; 434 byte[] buf = new byte[totalLen]; 435 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 436 FSDataInputStream in = mock(FSDataInputStream.class); 437 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); 438 when(in.read(5, buf, 5, 10)).thenReturn(10); 439 when(in.hasCapability(anyString())).thenReturn(false); 440 boolean ret = 441 TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), 442 testName.getMethodName()); 443 assertTrue("Expect true return when reading extra bytes succeeds", ret); 444 verify(in).read(position, buf, bufOffset, totalLen); 445 verify(in).read(5, buf, 5, 10); 446 verify(in).hasCapability(anyString()); 447 verifyNoMoreInteractions(in); 448 449 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 450 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 451 assertThat(otelRule.getSpans(), 452 hasItems(allOf(hasName(testName.getMethodName()), 453 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 454 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); 455 } 456 457 @Test 458 public void testPositionalReadPrematureEOF() throws IOException { 459 long position = 0; 460 int bufOffset = 0; 461 int necessaryLen = 10; 462 int extraLen = 0; 463 int totalLen = necessaryLen + extraLen; 464 byte[] buf = new byte[totalLen]; 465 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 466 FSDataInputStream in = mock(FSDataInputStream.class); 467 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9); 468 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1); 469 when(in.hasCapability(anyString())).thenReturn(false); 470 exception.expect(IOException.class); 471 exception.expectMessage("EOF"); 472 Span span = TraceUtil.createSpan(testName.getMethodName()); 473 try (Scope ignored = span.makeCurrent()) { 474 BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); 475 span.setStatus(StatusCode.OK); 476 } catch (IOException e) { 477 TraceUtil.setError(span, e); 478 throw e; 479 } finally { 480 span.end(); 481 482 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 483 otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); 484 assertThat(otelRule.getSpans(), 485 hasItems(allOf(hasName(testName.getMethodName()), 486 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 487 hasAttributes(AttributesMatchers.isEmpty()))))))); 488 } 489 } 490 491 /** 492 * Determine if ByteBufferPositionedReadable API is available . 493 * @return true if FSDataInputStream implements ByteBufferPositionedReadable API. 494 */ 495 private boolean isByteBufferPositionedReadable() { 496 try { 497 // long position, ByteBuffer buf 498 FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class); 499 } catch (NoSuchMethodException e) { 500 return false; 501 } 502 return true; 503 } 504 505 public static class MyFSDataInputStream extends FSDataInputStream { 506 public MyFSDataInputStream(InputStream in) { 507 super(in); 508 } 509 510 // This is the ByteBufferPositionReadable API we want to test. 511 // Because the API is only available in Hadoop 3.3, FSDataInputStream in older Hadoop 512 // does not implement the interface, and it wouldn't compile trying to mock the method. 513 // So explicitly declare the method here to make mocking possible. 514 public int read(long position, ByteBuffer buf) throws IOException { 515 return 0; 516 } 517 } 518 519 @Test 520 public void testByteBufferPositionedReadable() throws IOException { 521 assumeTrue("Skip the test because ByteBufferPositionedReadable is not available", 522 isByteBufferPositionedReadable()); 523 long position = 0; 524 int necessaryLen = 10; 525 int extraLen = 1; 526 int totalLen = necessaryLen + extraLen; 527 int firstReadLen = 6; 528 int secondReadLen = totalLen - firstReadLen; 529 ByteBuffer buf = ByteBuffer.allocate(totalLen); 530 ByteBuff bb = new SingleByteBuff(buf); 531 MyFSDataInputStream in = mock(MyFSDataInputStream.class); 532 533 when(in.read(position, buf)).thenReturn(firstReadLen); 534 when(in.read(firstReadLen, buf)).thenReturn(secondReadLen); 535 when(in.hasCapability(anyString())).thenReturn(true); 536 boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); 537 assertTrue("Expect true return when reading extra bytes succeeds", ret); 538 verify(in).read(position, buf); 539 verify(in).read(firstReadLen, buf); 540 verify(in).hasCapability(anyString()); 541 verifyNoMoreInteractions(in); 542 } 543 544 @Test 545 public void testByteBufferPositionedReadableEOF() throws IOException { 546 assumeTrue("Skip the test because ByteBufferPositionedReadable is not available", 547 isByteBufferPositionedReadable()); 548 long position = 0; 549 int necessaryLen = 10; 550 int extraLen = 0; 551 int totalLen = necessaryLen + extraLen; 552 int firstReadLen = 9; 553 ByteBuffer buf = ByteBuffer.allocate(totalLen); 554 ByteBuff bb = new SingleByteBuff(buf); 555 MyFSDataInputStream in = mock(MyFSDataInputStream.class); 556 557 when(in.read(position, buf)).thenReturn(firstReadLen); 558 when(in.read(position, buf)).thenReturn(-1); 559 when(in.hasCapability(anyString())).thenReturn(true); 560 exception.expect(IOException.class); 561 exception.expectMessage("EOF"); 562 BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); 563 564 verify(in).read(position, buf); 565 verify(in).read(firstReadLen, buf); 566 verify(in).hasCapability(anyString()); 567 verifyNoMoreInteractions(in); 568 } 569}