001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertSame; 026import static org.junit.Assert.assertThrows; 027import static org.junit.Assert.assertTrue; 028import static org.mockito.Mockito.doNothing; 029import static org.mockito.Mockito.mock; 030import static org.mockito.Mockito.when; 031 032import java.io.ByteArrayOutputStream; 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.HashMap; 037import java.util.Map; 038import java.util.NavigableMap; 039import java.util.OptionalLong; 040import java.util.TreeMap; 041import java.util.UUID; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ForkJoinPool; 044import java.util.concurrent.Future; 045import java.util.concurrent.PriorityBlockingQueue; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.concurrent.atomic.AtomicLong; 049import org.apache.commons.io.IOUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.FSDataInputStream; 052import org.apache.hadoop.fs.FSDataOutputStream; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.Cell; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.Server; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.Waiter; 060import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 061import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 062import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader; 063import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 064import org.apache.hadoop.hbase.replication.WALEntryFilter; 065import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext; 066import org.apache.hadoop.hbase.util.Bytes; 067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 068import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 069import org.apache.hadoop.hbase.wal.WAL; 070import org.apache.hadoop.hbase.wal.WALEdit; 071import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 072import org.apache.hadoop.hbase.wal.WALFactory; 073import org.apache.hadoop.hbase.wal.WALKeyImpl; 074import org.apache.hadoop.hbase.wal.WALProvider; 075import org.junit.Assert; 076import org.junit.Before; 077import org.junit.Test; 078import org.junit.runners.Parameterized.Parameter; 079import org.junit.runners.Parameterized.Parameters; 080import org.mockito.Mockito; 081 082import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 084 085public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { 086 087 @Parameter 088 public boolean isCompressionEnabled; 089 090 @Parameters(name = "{index}: isCompressionEnabled={0}") 091 public static Iterable<Object[]> data() { 092 return Arrays.asList(new Object[] { false }, new Object[] { true }); 093 } 094 095 @Before 096 public void setUp() throws Exception { 097 CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled); 098 initWAL(); 099 } 100 101 private WAL.Entry next(WALEntryStream entryStream) { 102 assertEquals(HasNext.YES, entryStream.hasNext()); 103 return entryStream.next(); 104 } 105 106 /** 107 * Tests basic reading of log appends 108 */ 109 @Test 110 public void testAppendsWithRolls() throws Exception { 111 appendToLogAndSync(); 112 long oldPos; 113 try (WALEntryStream entryStream = 114 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 115 // There's one edit in the log, read it. Reading past it needs to throw exception 116 assertEquals(HasNext.YES, entryStream.hasNext()); 117 WAL.Entry entry = entryStream.peek(); 118 assertSame(entry, entryStream.next()); 119 assertNotNull(entry); 120 assertEquals(HasNext.RETRY, entryStream.hasNext()); 121 assertNull(entryStream.peek()); 122 assertThrows(IllegalStateException.class, () -> entryStream.next()); 123 oldPos = entryStream.getPosition(); 124 } 125 126 appendToLogAndSync(); 127 128 try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, oldPos, log, 129 new MetricsSource("1"), fakeWalGroupId)) { 130 // Read the newly added entry, make sure we made progress 131 WAL.Entry entry = next(entryStream); 132 assertNotEquals(oldPos, entryStream.getPosition()); 133 assertNotNull(entry); 134 oldPos = entryStream.getPosition(); 135 } 136 137 // We rolled but we still should see the end of the first log and get that item 138 appendToLogAndSync(); 139 log.rollWriter(); 140 appendToLogAndSync(); 141 142 try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 143 oldPos, log, new MetricsSource("1"), fakeWalGroupId)) { 144 WAL.Entry entry = next(entryStream); 145 assertNotEquals(oldPos, entryStream.getPosition()); 146 assertNotNull(entry); 147 148 // next item should come from the new log 149 entry = next(entryStream); 150 assertNotEquals(oldPos, entryStream.getPosition()); 151 assertNotNull(entry); 152 153 // no more entries to read, disable retry otherwise we will get a wait too much time error 154 entryStream.disableRetry(); 155 assertEquals(HasNext.RETRY, entryStream.hasNext()); 156 oldPos = entryStream.getPosition(); 157 } 158 } 159 160 /** 161 * Tests that if after a stream is opened, more entries come in and then the log is rolled, we 162 * don't mistakenly dequeue the current log thinking we're done with it 163 */ 164 @Test 165 public void testLogRollWhileStreaming() throws Exception { 166 appendToLog("1"); 167 // 2 168 appendToLog("2"); 169 try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 170 0, log, new MetricsSource("1"), fakeWalGroupId)) { 171 assertEquals("1", getRow(next(entryStream))); 172 173 // 3 - comes in after reader opened 174 appendToLog("3"); 175 // log roll happening while we're reading 176 log.rollWriter(); 177 // 4 - this append is in the rolled log 178 appendToLog("4"); 179 180 assertEquals("2", getRow(next(entryStream))); 181 // we should not have dequeued yet since there's still an entry in first log 182 assertEquals(2, getQueue().size()); 183 // if implemented improperly, this would be 4 and 3 would be skipped 184 assertEquals("3", getRow(next(entryStream))); 185 // 4 186 assertEquals("4", getRow(next(entryStream))); 187 // now we've dequeued and moved on to next log properly 188 assertEquals(1, getQueue().size()); 189 190 // disable so we can get the return value immediately, otherwise we will fail with wait too 191 // much time... 192 entryStream.disableRetry(); 193 assertEquals(HasNext.RETRY, entryStream.hasNext()); 194 } 195 } 196 197 /** 198 * Tests that if writes come in while we have a stream open, we shouldn't miss them 199 */ 200 201 @Test 202 public void testNewEntriesWhileStreaming() throws Exception { 203 appendToLog("1"); 204 try (WALEntryStream entryStream = 205 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 206 assertNotNull(next(entryStream)); // we've hit the end of the stream at this point 207 208 // some new entries come in while we're streaming 209 appendToLog("2"); 210 appendToLog("3"); 211 212 // don't see them 213 assertEquals(HasNext.RETRY, entryStream.hasNext()); 214 215 // But we do if we retry next time, as the entryStream will reset the reader 216 assertEquals("2", getRow(next(entryStream))); 217 assertEquals("3", getRow(next(entryStream))); 218 // reached the end again 219 assertEquals(HasNext.RETRY, entryStream.hasNext()); 220 } 221 } 222 223 @Test 224 public void testResumeStreamingFromPosition() throws Exception { 225 long lastPosition = 0; 226 appendToLog("1"); 227 try (WALEntryStream entryStream = 228 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 229 assertNotNull(next(entryStream)); // we've hit the end of the stream at this point 230 appendToLog("2"); 231 appendToLog("3"); 232 lastPosition = entryStream.getPosition(); 233 } 234 // next stream should picks up where we left off 235 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log, 236 new MetricsSource("1"), fakeWalGroupId)) { 237 assertEquals("2", getRow(next(entryStream))); 238 assertEquals("3", getRow(next(entryStream))); 239 assertEquals(HasNext.RETRY, entryStream.hasNext()); // done 240 assertEquals(1, getQueue().size()); 241 } 242 } 243 244 /** 245 * Tests that if we stop before hitting the end of a stream, we can continue where we left off 246 * using the last position 247 */ 248 @Test 249 public void testPosition() throws Exception { 250 long lastPosition = 0; 251 appendEntriesToLogAndSync(3); 252 // read only one element 253 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log, 254 new MetricsSource("1"), fakeWalGroupId)) { 255 assertNotNull(next(entryStream)); 256 lastPosition = entryStream.getPosition(); 257 } 258 // there should still be two more entries from where we left off 259 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log, 260 new MetricsSource("1"), fakeWalGroupId)) { 261 assertNotNull(next(entryStream)); 262 assertNotNull(next(entryStream)); 263 assertEquals(HasNext.RETRY, entryStream.hasNext()); 264 } 265 } 266 267 @Test 268 public void testEmptyStream() throws Exception { 269 try (WALEntryStream entryStream = 270 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 271 assertEquals(HasNext.RETRY, entryStream.hasNext()); 272 } 273 } 274 275 @Test 276 public void testWALKeySerialization() throws Exception { 277 Map<String, byte[]> attributes = new HashMap<String, byte[]>(); 278 attributes.put("foo", Bytes.toBytes("foo-value")); 279 attributes.put("bar", Bytes.toBytes("bar-value")); 280 WALKeyImpl key = 281 new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(), 282 new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes); 283 Assert.assertEquals(attributes, key.getExtendedAttributes()); 284 285 WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor()); 286 WALProtos.WALKey serializedKey = builder.build(); 287 288 WALKeyImpl deserializedKey = new WALKeyImpl(); 289 deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor()); 290 291 // equals() only checks region name, sequence id and write time 292 Assert.assertEquals(key, deserializedKey); 293 // can't use Map.equals() because byte arrays use reference equality 294 Assert.assertEquals(key.getExtendedAttributes().keySet(), 295 deserializedKey.getExtendedAttributes().keySet()); 296 for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) { 297 Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue()); 298 } 299 Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes()); 300 } 301 302 private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) 303 throws IOException { 304 ReplicationSourceManager mockSourceManager = new ReplicationSourceManager(null, null, conf, 305 null, null, null, null, null, null, null, createMockGlobalMetrics()); 306 Server mockServer = Mockito.mock(Server.class); 307 ReplicationSource source = Mockito.mock(ReplicationSource.class); 308 when(source.getSourceManager()).thenReturn(mockSourceManager); 309 when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); 310 when(source.getWALFileLengthProvider()).thenReturn(log); 311 when(source.getServer()).thenReturn(mockServer); 312 when(source.isRecovered()).thenReturn(recovered); 313 return source; 314 } 315 316 private MetricsReplicationGlobalSourceSource createMockGlobalMetrics() { 317 MetricsReplicationGlobalSourceSource globalMetrics = 318 Mockito.mock(MetricsReplicationGlobalSourceSource.class); 319 final AtomicLong bufferUsedCounter = new AtomicLong(0); 320 Mockito.doAnswer((invocationOnMock) -> { 321 bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class)); 322 return null; 323 }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong()); 324 when(globalMetrics.getWALReaderEditsBufferBytes()) 325 .then(invocationOnMock -> bufferUsedCounter.get()); 326 return globalMetrics; 327 } 328 329 private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) 330 throws IOException { 331 ReplicationSource source = mockReplicationSource(recovered, conf); 332 when(source.isPeerEnabled()).thenReturn(true); 333 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, 334 getDummyFilter(), source, fakeWalGroupId); 335 reader.start(); 336 return reader; 337 } 338 339 private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures, 340 Configuration conf) throws IOException { 341 ReplicationSource source = mockReplicationSource(false, conf); 342 when(source.isPeerEnabled()).thenReturn(true); 343 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, 344 getIntermittentFailingFilter(numFailures), source, fakeWalGroupId); 345 reader.start(); 346 return reader; 347 } 348 349 @Test 350 public void testReplicationSourceWALReader() throws Exception { 351 appendEntriesToLogAndSync(3); 352 // get ending position 353 long position; 354 try (WALEntryStream entryStream = 355 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 356 for (int i = 0; i < 3; i++) { 357 assertNotNull(next(entryStream)); 358 } 359 position = entryStream.getPosition(); 360 } 361 362 // start up a reader 363 Path walPath = getQueue().peek(); 364 ReplicationSourceWALReader reader = createReader(false, CONF); 365 WALEntryBatch entryBatch = reader.take(); 366 367 // should've batched up our entries 368 assertNotNull(entryBatch); 369 assertEquals(3, entryBatch.getWalEntries().size()); 370 assertEquals(position, entryBatch.getLastWalPosition()); 371 assertEquals(walPath, entryBatch.getLastWalPath()); 372 assertEquals(3, entryBatch.getNbRowKeys()); 373 374 appendToLog("foo"); 375 entryBatch = reader.take(); 376 assertEquals(1, entryBatch.getNbEntries()); 377 assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); 378 } 379 380 @Test 381 public void testReplicationSourceWALReaderWithFailingFilter() throws Exception { 382 appendEntriesToLogAndSync(3); 383 // get ending position 384 long position; 385 try (WALEntryStream entryStream = 386 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 387 for (int i = 0; i < 3; i++) { 388 assertNotNull(next(entryStream)); 389 } 390 position = entryStream.getPosition(); 391 } 392 393 // start up a reader 394 Path walPath = getQueue().peek(); 395 int numFailuresInFilter = 5; 396 ReplicationSourceWALReader reader = 397 createReaderWithBadReplicationFilter(numFailuresInFilter, CONF); 398 WALEntryBatch entryBatch = reader.take(); 399 assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures()); 400 401 // should've batched up our entries 402 assertNotNull(entryBatch); 403 assertEquals(3, entryBatch.getWalEntries().size()); 404 assertEquals(position, entryBatch.getLastWalPosition()); 405 assertEquals(walPath, entryBatch.getLastWalPath()); 406 assertEquals(3, entryBatch.getNbRowKeys()); 407 } 408 409 @Test 410 public void testReplicationSourceWALReaderRecovered() throws Exception { 411 appendEntriesToLogAndSync(10); 412 Path walPath = getQueue().peek(); 413 log.rollWriter(); 414 appendEntriesToLogAndSync(5); 415 log.shutdown(); 416 417 Configuration conf = new Configuration(CONF); 418 conf.setInt("replication.source.nb.capacity", 10); 419 420 ReplicationSourceWALReader reader = createReader(true, conf); 421 422 WALEntryBatch batch = reader.take(); 423 assertEquals(walPath, batch.getLastWalPath()); 424 assertEquals(10, batch.getNbEntries()); 425 assertFalse(batch.isEndOfFile()); 426 427 batch = reader.take(); 428 assertEquals(walPath, batch.getLastWalPath()); 429 assertEquals(0, batch.getNbEntries()); 430 assertTrue(batch.isEndOfFile()); 431 432 walPath = getQueue().peek(); 433 batch = reader.take(); 434 assertEquals(walPath, batch.getLastWalPath()); 435 assertEquals(5, batch.getNbEntries()); 436 assertTrue(batch.isEndOfFile()); 437 438 assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); 439 } 440 441 // Testcase for HBASE-20206 442 @Test 443 public void testReplicationSourceWALReaderWrongPosition() throws Exception { 444 appendEntriesToLogAndSync(1); 445 Path walPath = getQueue().peek(); 446 log.rollWriter(); 447 appendEntriesToLogAndSync(20); 448 TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() { 449 450 @Override 451 public boolean evaluate() throws Exception { 452 return fs.getFileStatus(walPath).getLen() > 0 453 && ((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0; 454 } 455 456 @Override 457 public String explainFailure() throws Exception { 458 return walPath + " has not been closed yet"; 459 } 460 461 }); 462 463 ReplicationSourceWALReader reader = createReader(false, CONF); 464 465 WALEntryBatch entryBatch = reader.take(); 466 assertEquals(walPath, entryBatch.getLastWalPath()); 467 468 long walLength = fs.getFileStatus(walPath).getLen(); 469 assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " 470 + walLength, entryBatch.getLastWalPosition() <= walLength); 471 assertEquals(1, entryBatch.getNbEntries()); 472 assertTrue(entryBatch.isEndOfFile()); 473 474 Path walPath2 = getQueue().peek(); 475 entryBatch = reader.take(); 476 assertEquals(walPath2, entryBatch.getLastWalPath()); 477 assertEquals(20, entryBatch.getNbEntries()); 478 assertFalse(entryBatch.isEndOfFile()); 479 480 log.rollWriter(); 481 appendEntriesToLogAndSync(10); 482 entryBatch = reader.take(); 483 assertEquals(walPath2, entryBatch.getLastWalPath()); 484 assertEquals(0, entryBatch.getNbEntries()); 485 assertTrue(entryBatch.isEndOfFile()); 486 487 Path walPath3 = getQueue().peek(); 488 entryBatch = reader.take(); 489 assertEquals(walPath3, entryBatch.getLastWalPath()); 490 assertEquals(10, entryBatch.getNbEntries()); 491 assertFalse(entryBatch.isEndOfFile()); 492 } 493 494 @Test 495 public void testReplicationSourceWALReaderDisabled() 496 throws IOException, InterruptedException, ExecutionException { 497 appendEntriesToLogAndSync(3); 498 // get ending position 499 long position; 500 try (WALEntryStream entryStream = 501 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 502 for (int i = 0; i < 3; i++) { 503 assertNotNull(next(entryStream)); 504 } 505 position = entryStream.getPosition(); 506 } 507 508 // start up a reader 509 Path walPath = getQueue().peek(); 510 ReplicationSource source = mockReplicationSource(false, CONF); 511 AtomicInteger invokeCount = new AtomicInteger(0); 512 AtomicBoolean enabled = new AtomicBoolean(false); 513 when(source.isPeerEnabled()).then(i -> { 514 invokeCount.incrementAndGet(); 515 return enabled.get(); 516 }); 517 518 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0, 519 getDummyFilter(), source, fakeWalGroupId); 520 reader.start(); 521 Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> { 522 return reader.take(); 523 }); 524 // make sure that the isPeerEnabled has been called several times 525 TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5); 526 // confirm that we can read nothing if the peer is disabled 527 assertFalse(future.isDone()); 528 // then enable the peer, we should get the batch 529 enabled.set(true); 530 WALEntryBatch entryBatch = future.get(); 531 532 // should've batched up our entries 533 assertNotNull(entryBatch); 534 assertEquals(3, entryBatch.getWalEntries().size()); 535 assertEquals(position, entryBatch.getLastWalPosition()); 536 assertEquals(walPath, entryBatch.getLastWalPath()); 537 assertEquals(3, entryBatch.getNbRowKeys()); 538 } 539 540 private String getRow(WAL.Entry entry) { 541 Cell cell = entry.getEdit().getCells().get(0); 542 return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 543 } 544 545 private void appendToLog(String key) throws IOException { 546 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 547 EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key)); 548 log.sync(txid); 549 } 550 551 private void appendEntriesToLogAndSync(int count) throws IOException { 552 long txid = -1L; 553 for (int i = 0; i < count; i++) { 554 txid = appendToLog(1); 555 } 556 log.sync(txid); 557 } 558 559 private WALEdit getWALEdit(String row) { 560 WALEdit edit = new WALEdit(); 561 WALEditInternalHelper.addExtendedCell(edit, new KeyValue(Bytes.toBytes(row), family, qualifier, 562 EnvironmentEdgeManager.currentTime(), qualifier)); 563 return edit; 564 } 565 566 private WALEntryFilter getDummyFilter() { 567 return new WALEntryFilter() { 568 569 @Override 570 public WAL.Entry filter(WAL.Entry entry) { 571 return entry; 572 } 573 }; 574 } 575 576 private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) { 577 return new FailingWALEntryFilter(numFailuresInFilter); 578 } 579 580 public static class FailingWALEntryFilter implements WALEntryFilter { 581 private int numFailures = 0; 582 private static int countFailures = 0; 583 584 public FailingWALEntryFilter(int numFailuresInFilter) { 585 numFailures = numFailuresInFilter; 586 } 587 588 @Override 589 public WAL.Entry filter(WAL.Entry entry) { 590 if (countFailures == numFailures) { 591 return entry; 592 } 593 countFailures = countFailures + 1; 594 throw new WALEntryFilterRetryableException("failing filter"); 595 } 596 597 public static int numFailures() { 598 return countFailures; 599 } 600 } 601 602 @Test 603 public void testReadBeyondCommittedLength() throws IOException, InterruptedException { 604 appendToLog("1"); 605 appendToLog("2"); 606 long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong(); 607 AtomicLong fileLength = new AtomicLong(size - 1); 608 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, 0, 609 p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"), fakeWalGroupId)) { 610 assertNotNull(next(entryStream)); 611 // can not get log 2 612 assertEquals(HasNext.RETRY, entryStream.hasNext()); 613 Thread.sleep(1000); 614 // still can not get log 2 615 assertEquals(HasNext.RETRY, entryStream.hasNext()); 616 617 // can get log 2 now 618 fileLength.set(size); 619 assertNotNull(next(entryStream)); 620 621 assertEquals(HasNext.RETRY, entryStream.hasNext()); 622 } 623 } 624 625 /** 626 * Test removal of 0 length log from logQueue if the source is a recovered source and size of 627 * logQueue is only 1. 628 */ 629 @Test 630 public void testEOFExceptionForRecoveredQueue() throws Exception { 631 // Create a 0 length log. 632 Path emptyLog = new Path("emptyLog"); 633 FSDataOutputStream fsdos = fs.create(emptyLog); 634 fsdos.close(); 635 assertEquals(0, fs.getFileStatus(emptyLog).getLen()); 636 637 Configuration conf = new Configuration(CONF); 638 // Override the max retries multiplier to fail fast. 639 conf.setInt("replication.source.maxretriesmultiplier", 1); 640 conf.setBoolean("replication.source.eof.autorecovery", true); 641 conf.setInt("replication.source.nb.batches", 10); 642 // Create a reader thread with source as recovered source. 643 ReplicationSource source = mockReplicationSource(true, conf); 644 when(source.isPeerEnabled()).thenReturn(true); 645 646 MetricsSource metrics = mock(MetricsSource.class); 647 doNothing().when(metrics).incrSizeOfLogQueue(); 648 doNothing().when(metrics).decrSizeOfLogQueue(); 649 ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); 650 localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); 651 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, 652 getDummyFilter(), source, fakeWalGroupId); 653 reader.start(); 654 reader.join(); 655 // ReplicationSourceWALReaderThread#handleEofException method will 656 // remove empty log from logQueue. 657 assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); 658 } 659 660 @Test 661 public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception { 662 Configuration conf = new Configuration(CONF); 663 MetricsSource metrics = mock(MetricsSource.class); 664 ReplicationSource source = mockReplicationSource(true, conf); 665 ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); 666 // Create a 0 length log. 667 Path emptyLog = new Path(fs.getHomeDirectory(), "log.2." + isCompressionEnabled); 668 fs.create(emptyLog).close(); 669 assertEquals(0, fs.getFileStatus(emptyLog).getLen()); 670 localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); 671 672 final Path log1 = new Path(fs.getHomeDirectory(), "log.1." + isCompressionEnabled); 673 WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration()); 674 appendEntries(writer1, 3); 675 localLogQueue.enqueueLog(log1, fakeWalGroupId); 676 677 when(source.isPeerEnabled()).thenReturn(true); 678 // Override the max retries multiplier to fail fast. 679 conf.setInt("replication.source.maxretriesmultiplier", 1); 680 conf.setBoolean("replication.source.eof.autorecovery", true); 681 conf.setInt("replication.source.nb.batches", 10); 682 // Create a reader thread. 683 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, 684 getDummyFilter(), source, fakeWalGroupId); 685 assertEquals("Initial log queue size is not correct", 2, 686 localLogQueue.getQueueSize(fakeWalGroupId)); 687 reader.start(); 688 reader.join(); 689 690 // remove empty log from logQueue. 691 assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); 692 assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId)); 693 } 694 695 private PriorityBlockingQueue<Path> getQueue() { 696 return logQueue.getQueue(fakeWalGroupId); 697 } 698 699 private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { 700 for (int i = 0; i < numEntries; i++) { 701 byte[] b = Bytes.toBytes(Integer.toString(i)); 702 KeyValue kv = new KeyValue(b, b, b); 703 WALEdit edit = new WALEdit(); 704 WALEditInternalHelper.addExtendedCell(edit, kv); 705 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 706 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 707 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 708 writer.append(new WAL.Entry(key, edit)); 709 writer.sync(false); 710 } 711 writer.close(); 712 } 713 714 /*** 715 * Tests size of log queue is incremented and decremented properly. 716 */ 717 @Test 718 public void testSizeOfLogQueue() throws Exception { 719 // There should be always 1 log which is current wal. 720 assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); 721 appendToLogAndSync(); 722 723 log.rollWriter(); 724 // wait until the previous WAL file is cleanly closed, so later we can aleays see 725 // RETRY_IMMEDIATELY instead of RETRY. The wait here is necessary because the closing of a WAL 726 // writer is asynchronouns 727 TEST_UTIL.waitFor(30000, () -> fs.getClient().isFileClosed(logQueue.getQueue(fakeWalGroupId) 728 .peek().makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri().getPath())); 729 // After rolling there will be 2 wals in the queue 730 assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue()); 731 732 try (WALEntryStream entryStream = 733 new WALEntryStream(logQueue, fs, CONF, 0, log, logQueue.getMetrics(), fakeWalGroupId)) { 734 // There's one edit in the log, read it. 735 assertNotNull(next(entryStream)); 736 // we've switched to the next WAL, and the previous WAL file is closed cleanly, so it is 737 // RETRY_IMMEDIATELY 738 assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext()); 739 } 740 // After removing one wal, size of log queue will be 1 again. 741 assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); 742 } 743 744 /** 745 * Tests that wals are closed cleanly and we read the trailer when we remove wal from 746 * WALEntryStream. 747 */ 748 @Test 749 public void testCleanClosedWALs() throws Exception { 750 try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 0, log, 751 logQueue.getMetrics(), fakeWalGroupId)) { 752 assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); 753 appendToLogAndSync(); 754 assertNotNull(next(entryStream)); 755 log.rollWriter(); 756 appendToLogAndSync(); 757 assertNotNull(next(entryStream)); 758 assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); 759 } 760 } 761 762 /** 763 * Tests that we handle EOFException properly if the wal has moved to oldWALs directory. 764 */ 765 @Test 766 public void testEOFExceptionInOldWALsDirectory() throws Exception { 767 assertEquals(1, logQueue.getQueueSize(fakeWalGroupId)); 768 AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log; 769 Path emptyLogFile = abstractWAL.getCurrentFileName(); 770 log.rollWriter(true); 771 772 // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously. 773 // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to 774 // oldWALs directory. 775 Waiter.waitFor(CONF, 5000, 776 (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0); 777 // There will 2 logs in the queue. 778 assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); 779 780 // Get the archived dir path for the first wal. 781 Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF); 782 // Make sure that the wal path is not the same as archived Dir path. 783 assertNotNull(archivePath); 784 assertTrue(fs.exists(archivePath)); 785 fs.truncate(archivePath, 0); 786 // make sure the size of the wal file is 0. 787 assertEquals(0, fs.getFileStatus(archivePath).getLen()); 788 789 ReplicationSource source = Mockito.mock(ReplicationSource.class); 790 when(source.isPeerEnabled()).thenReturn(true); 791 792 Configuration localConf = new Configuration(CONF); 793 localConf.setInt("replication.source.maxretriesmultiplier", 1); 794 localConf.setBoolean("replication.source.eof.autorecovery", true); 795 // Start the reader thread. 796 createReader(false, localConf); 797 // Wait for the replication queue size to be 1. This means that we have handled 798 // 0 length wal from oldWALs directory. 799 Waiter.waitFor(localConf, 10000, 800 (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1); 801 } 802 803 /** 804 * This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some 805 * entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be 806 * decreased because {@link WALEntryBatch} is not put to 807 * {@link ReplicationSourceWALReader#entryBatchQueue}. 808 */ 809 @Test 810 public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception { 811 appendEntriesToLogAndSync(3); 812 // get ending position 813 long position; 814 try (WALEntryStream entryStream = 815 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 816 for (int i = 0; i < 3; i++) { 817 assertNotNull(next(entryStream)); 818 } 819 position = entryStream.getPosition(); 820 } 821 822 Path walPath = getQueue().peek(); 823 int maxThrowExceptionCount = 3; 824 825 ReplicationSource source = mockReplicationSource(false, CONF); 826 when(source.isPeerEnabled()).thenReturn(true); 827 PartialWALEntryFailingWALEntryFilter walEntryFilter = 828 new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3); 829 ReplicationSourceWALReader reader = 830 new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId); 831 reader.start(); 832 WALEntryBatch entryBatch = reader.take(); 833 834 assertNotNull(entryBatch); 835 assertEquals(3, entryBatch.getWalEntries().size()); 836 long sum = entryBatch.getWalEntries().stream() 837 .mapToLong(WALEntryBatch::getEntrySizeExcludeBulkLoad).sum(); 838 assertEquals(position, entryBatch.getLastWalPosition()); 839 assertEquals(walPath, entryBatch.getLastWalPath()); 840 assertEquals(3, entryBatch.getNbRowKeys()); 841 assertEquals(sum, source.getSourceManager().getTotalBufferUsed()); 842 assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes()); 843 assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount()); 844 assertNull(reader.poll(10)); 845 } 846 847 // testcase for HBASE-28748 848 @Test 849 public void testWALEntryStreamEOFRightAfterHeader() throws Exception { 850 assertEquals(1, logQueue.getQueueSize(fakeWalGroupId)); 851 AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log; 852 Path emptyLogFile = abstractWAL.getCurrentFileName(); 853 log.rollWriter(true); 854 855 // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously. 856 // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to 857 // oldWALs directory. 858 Waiter.waitFor(CONF, 5000, 859 (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0); 860 // There will 2 logs in the queue. 861 assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); 862 appendToLogAndSync(); 863 864 Path archivedEmptyLogFile = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF); 865 866 // read the wal header 867 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 868 bos.write(AbstractProtobufWALReader.PB_WAL_MAGIC); 869 try (FSDataInputStream in = fs.open(archivedEmptyLogFile)) { 870 IOUtils.skipFully(in, AbstractProtobufWALReader.PB_WAL_MAGIC.length); 871 WALHeader header = WALHeader.parseDelimitedFrom(in); 872 header.writeDelimitedTo(bos); 873 } 874 // truncate the first empty log so we have an incomplete header 875 try (FSDataOutputStream out = fs.create(archivedEmptyLogFile, true)) { 876 bos.writeTo(out); 877 } 878 try (WALEntryStream entryStream = 879 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 880 assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext()); 881 assertNotNull(next(entryStream)); 882 } 883 } 884 885 private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter { 886 private int filteredWALEntryCount = -1; 887 private int walEntryCount = 0; 888 private int throwExceptionCount = -1; 889 private int maxThrowExceptionCount; 890 891 public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) { 892 this.maxThrowExceptionCount = throwExceptionLimit; 893 this.walEntryCount = walEntryCount; 894 } 895 896 @Override 897 public WAL.Entry filter(WAL.Entry entry) { 898 filteredWALEntryCount++; 899 if (filteredWALEntryCount < walEntryCount - 1) { 900 return entry; 901 } 902 903 filteredWALEntryCount = -1; 904 throwExceptionCount++; 905 if (throwExceptionCount <= maxThrowExceptionCount - 1) { 906 throw new WALEntryFilterRetryableException("failing filter"); 907 } 908 return entry; 909 } 910 911 public int getThrowExceptionCount() { 912 return throwExceptionCount; 913 } 914 } 915 916}