001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertSame; 026import static org.junit.Assert.assertThrows; 027import static org.junit.Assert.assertTrue; 028import static org.mockito.Mockito.doNothing; 029import static org.mockito.Mockito.mock; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.HashMap; 036import java.util.Map; 037import java.util.NavigableMap; 038import java.util.OptionalLong; 039import java.util.TreeMap; 040import java.util.UUID; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ForkJoinPool; 043import java.util.concurrent.Future; 044import java.util.concurrent.PriorityBlockingQueue; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.atomic.AtomicLong; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FSDataOutputStream; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.Cell; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.KeyValue; 054import org.apache.hadoop.hbase.Server; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.Waiter; 057import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 058import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 059import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 060import org.apache.hadoop.hbase.replication.WALEntryFilter; 061import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 065import org.apache.hadoop.hbase.wal.WAL; 066import org.apache.hadoop.hbase.wal.WAL.Entry; 067import org.apache.hadoop.hbase.wal.WALEdit; 068import org.apache.hadoop.hbase.wal.WALFactory; 069import org.apache.hadoop.hbase.wal.WALKeyImpl; 070import org.apache.hadoop.hbase.wal.WALProvider; 071import org.junit.Assert; 072import org.junit.Before; 073import org.junit.Test; 074import org.junit.runners.Parameterized.Parameter; 075import org.junit.runners.Parameterized.Parameters; 076import org.mockito.Mockito; 077 078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 079 080public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { 081 082 @Parameter 083 public boolean isCompressionEnabled; 084 085 @Parameters(name = "{index}: isCompressionEnabled={0}") 086 public static Iterable<Object[]> data() { 087 return Arrays.asList(new Object[] { false }, new Object[] { true }); 088 } 089 090 @Before 091 public void setUp() throws Exception { 092 CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled); 093 initWAL(); 094 } 095 096 private Entry next(WALEntryStream entryStream) { 097 assertEquals(HasNext.YES, entryStream.hasNext()); 098 return entryStream.next(); 099 } 100 101 /** 102 * Tests basic reading of log appends 103 */ 104 @Test 105 public void testAppendsWithRolls() throws Exception { 106 appendToLogAndSync(); 107 long oldPos; 108 try (WALEntryStream entryStream = 109 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 110 // There's one edit in the log, read it. Reading past it needs to throw exception 111 assertEquals(HasNext.YES, entryStream.hasNext()); 112 WAL.Entry entry = entryStream.peek(); 113 assertSame(entry, entryStream.next()); 114 assertNotNull(entry); 115 assertEquals(HasNext.RETRY, entryStream.hasNext()); 116 assertNull(entryStream.peek()); 117 assertThrows(IllegalStateException.class, () -> entryStream.next()); 118 oldPos = entryStream.getPosition(); 119 } 120 121 appendToLogAndSync(); 122 123 try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, oldPos, log, 124 new MetricsSource("1"), fakeWalGroupId)) { 125 // Read the newly added entry, make sure we made progress 126 WAL.Entry entry = next(entryStream); 127 assertNotEquals(oldPos, entryStream.getPosition()); 128 assertNotNull(entry); 129 oldPos = entryStream.getPosition(); 130 } 131 132 // We rolled but we still should see the end of the first log and get that item 133 appendToLogAndSync(); 134 log.rollWriter(); 135 appendToLogAndSync(); 136 137 try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 138 oldPos, log, new MetricsSource("1"), fakeWalGroupId)) { 139 WAL.Entry entry = next(entryStream); 140 assertNotEquals(oldPos, entryStream.getPosition()); 141 assertNotNull(entry); 142 143 // next item should come from the new log 144 entry = next(entryStream); 145 assertNotEquals(oldPos, entryStream.getPosition()); 146 assertNotNull(entry); 147 148 // no more entries to read, disable retry otherwise we will get a wait too much time error 149 entryStream.disableRetry(); 150 assertEquals(HasNext.RETRY, entryStream.hasNext()); 151 oldPos = entryStream.getPosition(); 152 } 153 } 154 155 /** 156 * Tests that if after a stream is opened, more entries come in and then the log is rolled, we 157 * don't mistakenly dequeue the current log thinking we're done with it 158 */ 159 @Test 160 public void testLogRollWhileStreaming() throws Exception { 161 appendToLog("1"); 162 // 2 163 appendToLog("2"); 164 try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 165 0, log, new MetricsSource("1"), fakeWalGroupId)) { 166 assertEquals("1", getRow(next(entryStream))); 167 168 // 3 - comes in after reader opened 169 appendToLog("3"); 170 // log roll happening while we're reading 171 log.rollWriter(); 172 // 4 - this append is in the rolled log 173 appendToLog("4"); 174 175 assertEquals("2", getRow(next(entryStream))); 176 // we should not have dequeued yet since there's still an entry in first log 177 assertEquals(2, getQueue().size()); 178 // if implemented improperly, this would be 4 and 3 would be skipped 179 assertEquals("3", getRow(next(entryStream))); 180 // 4 181 assertEquals("4", getRow(next(entryStream))); 182 // now we've dequeued and moved on to next log properly 183 assertEquals(1, getQueue().size()); 184 185 // disable so we can get the return value immediately, otherwise we will fail with wait too 186 // much time... 187 entryStream.disableRetry(); 188 assertEquals(HasNext.RETRY, entryStream.hasNext()); 189 } 190 } 191 192 /** 193 * Tests that if writes come in while we have a stream open, we shouldn't miss them 194 */ 195 196 @Test 197 public void testNewEntriesWhileStreaming() throws Exception { 198 appendToLog("1"); 199 try (WALEntryStream entryStream = 200 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 201 assertNotNull(next(entryStream)); // we've hit the end of the stream at this point 202 203 // some new entries come in while we're streaming 204 appendToLog("2"); 205 appendToLog("3"); 206 207 // don't see them 208 assertEquals(HasNext.RETRY, entryStream.hasNext()); 209 210 // But we do if we retry next time, as the entryStream will reset the reader 211 assertEquals("2", getRow(next(entryStream))); 212 assertEquals("3", getRow(next(entryStream))); 213 // reached the end again 214 assertEquals(HasNext.RETRY, entryStream.hasNext()); 215 } 216 } 217 218 @Test 219 public void testResumeStreamingFromPosition() throws Exception { 220 long lastPosition = 0; 221 appendToLog("1"); 222 try (WALEntryStream entryStream = 223 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 224 assertNotNull(next(entryStream)); // we've hit the end of the stream at this point 225 appendToLog("2"); 226 appendToLog("3"); 227 lastPosition = entryStream.getPosition(); 228 } 229 // next stream should picks up where we left off 230 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log, 231 new MetricsSource("1"), fakeWalGroupId)) { 232 assertEquals("2", getRow(next(entryStream))); 233 assertEquals("3", getRow(next(entryStream))); 234 assertEquals(HasNext.RETRY, entryStream.hasNext()); // done 235 assertEquals(1, getQueue().size()); 236 } 237 } 238 239 /** 240 * Tests that if we stop before hitting the end of a stream, we can continue where we left off 241 * using the last position 242 */ 243 @Test 244 public void testPosition() throws Exception { 245 long lastPosition = 0; 246 appendEntriesToLogAndSync(3); 247 // read only one element 248 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log, 249 new MetricsSource("1"), fakeWalGroupId)) { 250 assertNotNull(next(entryStream)); 251 lastPosition = entryStream.getPosition(); 252 } 253 // there should still be two more entries from where we left off 254 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log, 255 new MetricsSource("1"), fakeWalGroupId)) { 256 assertNotNull(next(entryStream)); 257 assertNotNull(next(entryStream)); 258 assertEquals(HasNext.RETRY, entryStream.hasNext()); 259 } 260 } 261 262 @Test 263 public void testEmptyStream() throws Exception { 264 try (WALEntryStream entryStream = 265 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 266 assertEquals(HasNext.RETRY, entryStream.hasNext()); 267 } 268 } 269 270 @Test 271 public void testWALKeySerialization() throws Exception { 272 Map<String, byte[]> attributes = new HashMap<String, byte[]>(); 273 attributes.put("foo", Bytes.toBytes("foo-value")); 274 attributes.put("bar", Bytes.toBytes("bar-value")); 275 WALKeyImpl key = 276 new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(), 277 new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes); 278 Assert.assertEquals(attributes, key.getExtendedAttributes()); 279 280 WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor()); 281 WALProtos.WALKey serializedKey = builder.build(); 282 283 WALKeyImpl deserializedKey = new WALKeyImpl(); 284 deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor()); 285 286 // equals() only checks region name, sequence id and write time 287 Assert.assertEquals(key, deserializedKey); 288 // can't use Map.equals() because byte arrays use reference equality 289 Assert.assertEquals(key.getExtendedAttributes().keySet(), 290 deserializedKey.getExtendedAttributes().keySet()); 291 for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) { 292 Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue()); 293 } 294 Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes()); 295 } 296 297 private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) 298 throws IOException { 299 ReplicationSourceManager mockSourceManager = new ReplicationSourceManager(null, null, conf, 300 null, null, null, null, null, null, createMockGlobalMetrics()); 301 Server mockServer = Mockito.mock(Server.class); 302 ReplicationSource source = Mockito.mock(ReplicationSource.class); 303 when(source.getSourceManager()).thenReturn(mockSourceManager); 304 when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); 305 when(source.getWALFileLengthProvider()).thenReturn(log); 306 when(source.getServer()).thenReturn(mockServer); 307 when(source.isRecovered()).thenReturn(recovered); 308 return source; 309 } 310 311 private MetricsReplicationGlobalSourceSource createMockGlobalMetrics() { 312 MetricsReplicationGlobalSourceSource globalMetrics = 313 Mockito.mock(MetricsReplicationGlobalSourceSource.class); 314 final AtomicLong bufferUsedCounter = new AtomicLong(0); 315 Mockito.doAnswer((invocationOnMock) -> { 316 bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class)); 317 return null; 318 }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong()); 319 when(globalMetrics.getWALReaderEditsBufferBytes()) 320 .then(invocationOnMock -> bufferUsedCounter.get()); 321 return globalMetrics; 322 } 323 324 private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) 325 throws IOException { 326 ReplicationSource source = mockReplicationSource(recovered, conf); 327 when(source.isPeerEnabled()).thenReturn(true); 328 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, 329 getDummyFilter(), source, fakeWalGroupId); 330 reader.start(); 331 return reader; 332 } 333 334 private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures, 335 Configuration conf) throws IOException { 336 ReplicationSource source = mockReplicationSource(false, conf); 337 when(source.isPeerEnabled()).thenReturn(true); 338 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, 339 getIntermittentFailingFilter(numFailures), source, fakeWalGroupId); 340 reader.start(); 341 return reader; 342 } 343 344 @Test 345 public void testReplicationSourceWALReader() throws Exception { 346 appendEntriesToLogAndSync(3); 347 // get ending position 348 long position; 349 try (WALEntryStream entryStream = 350 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 351 for (int i = 0; i < 3; i++) { 352 assertNotNull(next(entryStream)); 353 } 354 position = entryStream.getPosition(); 355 } 356 357 // start up a reader 358 Path walPath = getQueue().peek(); 359 ReplicationSourceWALReader reader = createReader(false, CONF); 360 WALEntryBatch entryBatch = reader.take(); 361 362 // should've batched up our entries 363 assertNotNull(entryBatch); 364 assertEquals(3, entryBatch.getWalEntries().size()); 365 assertEquals(position, entryBatch.getLastWalPosition()); 366 assertEquals(walPath, entryBatch.getLastWalPath()); 367 assertEquals(3, entryBatch.getNbRowKeys()); 368 369 appendToLog("foo"); 370 entryBatch = reader.take(); 371 assertEquals(1, entryBatch.getNbEntries()); 372 assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); 373 } 374 375 @Test 376 public void testReplicationSourceWALReaderWithFailingFilter() throws Exception { 377 appendEntriesToLogAndSync(3); 378 // get ending position 379 long position; 380 try (WALEntryStream entryStream = 381 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 382 for (int i = 0; i < 3; i++) { 383 assertNotNull(next(entryStream)); 384 } 385 position = entryStream.getPosition(); 386 } 387 388 // start up a reader 389 Path walPath = getQueue().peek(); 390 int numFailuresInFilter = 5; 391 ReplicationSourceWALReader reader = 392 createReaderWithBadReplicationFilter(numFailuresInFilter, CONF); 393 WALEntryBatch entryBatch = reader.take(); 394 assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures()); 395 396 // should've batched up our entries 397 assertNotNull(entryBatch); 398 assertEquals(3, entryBatch.getWalEntries().size()); 399 assertEquals(position, entryBatch.getLastWalPosition()); 400 assertEquals(walPath, entryBatch.getLastWalPath()); 401 assertEquals(3, entryBatch.getNbRowKeys()); 402 } 403 404 @Test 405 public void testReplicationSourceWALReaderRecovered() throws Exception { 406 appendEntriesToLogAndSync(10); 407 Path walPath = getQueue().peek(); 408 log.rollWriter(); 409 appendEntriesToLogAndSync(5); 410 log.shutdown(); 411 412 Configuration conf = new Configuration(CONF); 413 conf.setInt("replication.source.nb.capacity", 10); 414 415 ReplicationSourceWALReader reader = createReader(true, conf); 416 417 WALEntryBatch batch = reader.take(); 418 assertEquals(walPath, batch.getLastWalPath()); 419 assertEquals(10, batch.getNbEntries()); 420 assertFalse(batch.isEndOfFile()); 421 422 batch = reader.take(); 423 assertEquals(walPath, batch.getLastWalPath()); 424 assertEquals(0, batch.getNbEntries()); 425 assertTrue(batch.isEndOfFile()); 426 427 walPath = getQueue().peek(); 428 batch = reader.take(); 429 assertEquals(walPath, batch.getLastWalPath()); 430 assertEquals(5, batch.getNbEntries()); 431 assertTrue(batch.isEndOfFile()); 432 433 assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); 434 } 435 436 // Testcase for HBASE-20206 437 @Test 438 public void testReplicationSourceWALReaderWrongPosition() throws Exception { 439 appendEntriesToLogAndSync(1); 440 Path walPath = getQueue().peek(); 441 log.rollWriter(); 442 appendEntriesToLogAndSync(20); 443 TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() { 444 445 @Override 446 public boolean evaluate() throws Exception { 447 return fs.getFileStatus(walPath).getLen() > 0 448 && ((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0; 449 } 450 451 @Override 452 public String explainFailure() throws Exception { 453 return walPath + " has not been closed yet"; 454 } 455 456 }); 457 458 ReplicationSourceWALReader reader = createReader(false, CONF); 459 460 WALEntryBatch entryBatch = reader.take(); 461 assertEquals(walPath, entryBatch.getLastWalPath()); 462 463 long walLength = fs.getFileStatus(walPath).getLen(); 464 assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " 465 + walLength, entryBatch.getLastWalPosition() <= walLength); 466 assertEquals(1, entryBatch.getNbEntries()); 467 assertTrue(entryBatch.isEndOfFile()); 468 469 Path walPath2 = getQueue().peek(); 470 entryBatch = reader.take(); 471 assertEquals(walPath2, entryBatch.getLastWalPath()); 472 assertEquals(20, entryBatch.getNbEntries()); 473 assertFalse(entryBatch.isEndOfFile()); 474 475 log.rollWriter(); 476 appendEntriesToLogAndSync(10); 477 entryBatch = reader.take(); 478 assertEquals(walPath2, entryBatch.getLastWalPath()); 479 assertEquals(0, entryBatch.getNbEntries()); 480 assertTrue(entryBatch.isEndOfFile()); 481 482 Path walPath3 = getQueue().peek(); 483 entryBatch = reader.take(); 484 assertEquals(walPath3, entryBatch.getLastWalPath()); 485 assertEquals(10, entryBatch.getNbEntries()); 486 assertFalse(entryBatch.isEndOfFile()); 487 } 488 489 @Test 490 public void testReplicationSourceWALReaderDisabled() 491 throws IOException, InterruptedException, ExecutionException { 492 appendEntriesToLogAndSync(3); 493 // get ending position 494 long position; 495 try (WALEntryStream entryStream = 496 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 497 for (int i = 0; i < 3; i++) { 498 assertNotNull(next(entryStream)); 499 } 500 position = entryStream.getPosition(); 501 } 502 503 // start up a reader 504 Path walPath = getQueue().peek(); 505 ReplicationSource source = mockReplicationSource(false, CONF); 506 AtomicInteger invokeCount = new AtomicInteger(0); 507 AtomicBoolean enabled = new AtomicBoolean(false); 508 when(source.isPeerEnabled()).then(i -> { 509 invokeCount.incrementAndGet(); 510 return enabled.get(); 511 }); 512 513 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0, 514 getDummyFilter(), source, fakeWalGroupId); 515 reader.start(); 516 Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> { 517 return reader.take(); 518 }); 519 // make sure that the isPeerEnabled has been called several times 520 TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5); 521 // confirm that we can read nothing if the peer is disabled 522 assertFalse(future.isDone()); 523 // then enable the peer, we should get the batch 524 enabled.set(true); 525 WALEntryBatch entryBatch = future.get(); 526 527 // should've batched up our entries 528 assertNotNull(entryBatch); 529 assertEquals(3, entryBatch.getWalEntries().size()); 530 assertEquals(position, entryBatch.getLastWalPosition()); 531 assertEquals(walPath, entryBatch.getLastWalPath()); 532 assertEquals(3, entryBatch.getNbRowKeys()); 533 } 534 535 private String getRow(WAL.Entry entry) { 536 Cell cell = entry.getEdit().getCells().get(0); 537 return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 538 } 539 540 private void appendToLog(String key) throws IOException { 541 final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 542 EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key)); 543 log.sync(txid); 544 } 545 546 private void appendEntriesToLogAndSync(int count) throws IOException { 547 long txid = -1L; 548 for (int i = 0; i < count; i++) { 549 txid = appendToLog(1); 550 } 551 log.sync(txid); 552 } 553 554 private WALEdit getWALEdit(String row) { 555 WALEdit edit = new WALEdit(); 556 edit.add(new KeyValue(Bytes.toBytes(row), family, qualifier, 557 EnvironmentEdgeManager.currentTime(), qualifier)); 558 return edit; 559 } 560 561 private WALEntryFilter getDummyFilter() { 562 return new WALEntryFilter() { 563 564 @Override 565 public Entry filter(Entry entry) { 566 return entry; 567 } 568 }; 569 } 570 571 private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) { 572 return new FailingWALEntryFilter(numFailuresInFilter); 573 } 574 575 public static class FailingWALEntryFilter implements WALEntryFilter { 576 private int numFailures = 0; 577 private static int countFailures = 0; 578 579 public FailingWALEntryFilter(int numFailuresInFilter) { 580 numFailures = numFailuresInFilter; 581 } 582 583 @Override 584 public Entry filter(Entry entry) { 585 if (countFailures == numFailures) { 586 return entry; 587 } 588 countFailures = countFailures + 1; 589 throw new WALEntryFilterRetryableException("failing filter"); 590 } 591 592 public static int numFailures() { 593 return countFailures; 594 } 595 } 596 597 @Test 598 public void testReadBeyondCommittedLength() throws IOException, InterruptedException { 599 appendToLog("1"); 600 appendToLog("2"); 601 long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong(); 602 AtomicLong fileLength = new AtomicLong(size - 1); 603 try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, 0, 604 p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"), fakeWalGroupId)) { 605 assertNotNull(next(entryStream)); 606 // can not get log 2 607 assertEquals(HasNext.RETRY, entryStream.hasNext()); 608 Thread.sleep(1000); 609 // still can not get log 2 610 assertEquals(HasNext.RETRY, entryStream.hasNext()); 611 612 // can get log 2 now 613 fileLength.set(size); 614 assertNotNull(next(entryStream)); 615 616 assertEquals(HasNext.RETRY, entryStream.hasNext()); 617 } 618 } 619 620 /** 621 * Test removal of 0 length log from logQueue if the source is a recovered source and size of 622 * logQueue is only 1. 623 */ 624 @Test 625 public void testEOFExceptionForRecoveredQueue() throws Exception { 626 // Create a 0 length log. 627 Path emptyLog = new Path("emptyLog"); 628 FSDataOutputStream fsdos = fs.create(emptyLog); 629 fsdos.close(); 630 assertEquals(0, fs.getFileStatus(emptyLog).getLen()); 631 632 Configuration conf = new Configuration(CONF); 633 // Override the max retries multiplier to fail fast. 634 conf.setInt("replication.source.maxretriesmultiplier", 1); 635 conf.setBoolean("replication.source.eof.autorecovery", true); 636 conf.setInt("replication.source.nb.batches", 10); 637 // Create a reader thread with source as recovered source. 638 ReplicationSource source = mockReplicationSource(true, conf); 639 when(source.isPeerEnabled()).thenReturn(true); 640 641 MetricsSource metrics = mock(MetricsSource.class); 642 doNothing().when(metrics).incrSizeOfLogQueue(); 643 doNothing().when(metrics).decrSizeOfLogQueue(); 644 ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); 645 localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); 646 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, 647 getDummyFilter(), source, fakeWalGroupId); 648 reader.start(); 649 reader.join(); 650 // ReplicationSourceWALReaderThread#handleEofException method will 651 // remove empty log from logQueue. 652 assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); 653 } 654 655 @Test 656 public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception { 657 Configuration conf = new Configuration(CONF); 658 MetricsSource metrics = mock(MetricsSource.class); 659 ReplicationSource source = mockReplicationSource(true, conf); 660 ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); 661 // Create a 0 length log. 662 Path emptyLog = new Path(fs.getHomeDirectory(), "log.2." + isCompressionEnabled); 663 fs.create(emptyLog).close(); 664 assertEquals(0, fs.getFileStatus(emptyLog).getLen()); 665 localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); 666 667 final Path log1 = new Path(fs.getHomeDirectory(), "log.1." + isCompressionEnabled); 668 WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration()); 669 appendEntries(writer1, 3); 670 localLogQueue.enqueueLog(log1, fakeWalGroupId); 671 672 when(source.isPeerEnabled()).thenReturn(true); 673 // Override the max retries multiplier to fail fast. 674 conf.setInt("replication.source.maxretriesmultiplier", 1); 675 conf.setBoolean("replication.source.eof.autorecovery", true); 676 conf.setInt("replication.source.nb.batches", 10); 677 // Create a reader thread. 678 ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, 679 getDummyFilter(), source, fakeWalGroupId); 680 assertEquals("Initial log queue size is not correct", 2, 681 localLogQueue.getQueueSize(fakeWalGroupId)); 682 reader.start(); 683 reader.join(); 684 685 // remove empty log from logQueue. 686 assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); 687 assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId)); 688 } 689 690 private PriorityBlockingQueue<Path> getQueue() { 691 return logQueue.getQueue(fakeWalGroupId); 692 } 693 694 private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { 695 for (int i = 0; i < numEntries; i++) { 696 byte[] b = Bytes.toBytes(Integer.toString(i)); 697 KeyValue kv = new KeyValue(b, b, b); 698 WALEdit edit = new WALEdit(); 699 edit.add(kv); 700 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 701 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); 702 scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); 703 writer.append(new WAL.Entry(key, edit)); 704 writer.sync(false); 705 } 706 writer.close(); 707 } 708 709 /*** 710 * Tests size of log queue is incremented and decremented properly. 711 */ 712 @Test 713 public void testSizeOfLogQueue() throws Exception { 714 // There should be always 1 log which is current wal. 715 assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); 716 appendToLogAndSync(); 717 718 log.rollWriter(); 719 // wait until the previous WAL file is cleanly closed, so later we can aleays see 720 // RETRY_IMMEDIATELY instead of RETRY. The wait here is necessary because the closing of a WAL 721 // writer is asynchronouns 722 TEST_UTIL.waitFor(30000, () -> fs.getClient().isFileClosed(logQueue.getQueue(fakeWalGroupId) 723 .peek().makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri().getPath())); 724 // After rolling there will be 2 wals in the queue 725 assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue()); 726 727 try (WALEntryStream entryStream = 728 new WALEntryStream(logQueue, fs, CONF, 0, log, logQueue.getMetrics(), fakeWalGroupId)) { 729 // There's one edit in the log, read it. 730 assertNotNull(next(entryStream)); 731 // we've switched to the next WAL, and the previous WAL file is closed cleanly, so it is 732 // RETRY_IMMEDIATELY 733 assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext()); 734 } 735 // After removing one wal, size of log queue will be 1 again. 736 assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); 737 } 738 739 /** 740 * Tests that wals are closed cleanly and we read the trailer when we remove wal from 741 * WALEntryStream. 742 */ 743 @Test 744 public void testCleanClosedWALs() throws Exception { 745 try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 0, log, 746 logQueue.getMetrics(), fakeWalGroupId)) { 747 assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); 748 appendToLogAndSync(); 749 assertNotNull(next(entryStream)); 750 log.rollWriter(); 751 appendToLogAndSync(); 752 assertNotNull(next(entryStream)); 753 assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); 754 } 755 } 756 757 /** 758 * Tests that we handle EOFException properly if the wal has moved to oldWALs directory. 759 */ 760 @Test 761 public void testEOFExceptionInOldWALsDirectory() throws Exception { 762 assertEquals(1, logQueue.getQueueSize(fakeWalGroupId)); 763 AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log; 764 Path emptyLogFile = abstractWAL.getCurrentFileName(); 765 log.rollWriter(true); 766 767 // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously. 768 // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to 769 // oldWALs directory. 770 Waiter.waitFor(CONF, 5000, 771 (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0); 772 // There will 2 logs in the queue. 773 assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); 774 775 // Get the archived dir path for the first wal. 776 Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF); 777 // Make sure that the wal path is not the same as archived Dir path. 778 assertNotNull(archivePath); 779 assertTrue(fs.exists(archivePath)); 780 fs.truncate(archivePath, 0); 781 // make sure the size of the wal file is 0. 782 assertEquals(0, fs.getFileStatus(archivePath).getLen()); 783 784 ReplicationSource source = Mockito.mock(ReplicationSource.class); 785 when(source.isPeerEnabled()).thenReturn(true); 786 787 Configuration localConf = new Configuration(CONF); 788 localConf.setInt("replication.source.maxretriesmultiplier", 1); 789 localConf.setBoolean("replication.source.eof.autorecovery", true); 790 // Start the reader thread. 791 createReader(false, localConf); 792 // Wait for the replication queue size to be 1. This means that we have handled 793 // 0 length wal from oldWALs directory. 794 Waiter.waitFor(localConf, 10000, 795 (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1); 796 } 797 798 /** 799 * This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some 800 * entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be 801 * decreased because {@link WALEntryBatch} is not put to 802 * {@link ReplicationSourceWALReader#entryBatchQueue}. 803 */ 804 @Test 805 public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception { 806 appendEntriesToLogAndSync(3); 807 // get ending position 808 long position; 809 try (WALEntryStream entryStream = 810 new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) { 811 for (int i = 0; i < 3; i++) { 812 assertNotNull(next(entryStream)); 813 } 814 position = entryStream.getPosition(); 815 } 816 817 Path walPath = getQueue().peek(); 818 int maxThrowExceptionCount = 3; 819 820 ReplicationSource source = mockReplicationSource(false, CONF); 821 when(source.isPeerEnabled()).thenReturn(true); 822 PartialWALEntryFailingWALEntryFilter walEntryFilter = 823 new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3); 824 ReplicationSourceWALReader reader = 825 new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId); 826 reader.start(); 827 WALEntryBatch entryBatch = reader.take(); 828 829 assertNotNull(entryBatch); 830 assertEquals(3, entryBatch.getWalEntries().size()); 831 long sum = entryBatch.getWalEntries().stream() 832 .mapToLong(WALEntryBatch::getEntrySizeExcludeBulkLoad).sum(); 833 assertEquals(position, entryBatch.getLastWalPosition()); 834 assertEquals(walPath, entryBatch.getLastWalPath()); 835 assertEquals(3, entryBatch.getNbRowKeys()); 836 assertEquals(sum, source.getSourceManager().getTotalBufferUsed()); 837 assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes()); 838 assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount()); 839 assertNull(reader.poll(10)); 840 } 841 842 private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter { 843 private int filteredWALEntryCount = -1; 844 private int walEntryCount = 0; 845 private int throwExceptionCount = -1; 846 private int maxThrowExceptionCount; 847 848 public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) { 849 this.maxThrowExceptionCount = throwExceptionLimit; 850 this.walEntryCount = walEntryCount; 851 } 852 853 @Override 854 public Entry filter(Entry entry) { 855 filteredWALEntryCount++; 856 if (filteredWALEntryCount < walEntryCount - 1) { 857 return entry; 858 } 859 860 filteredWALEntryCount = -1; 861 throwExceptionCount++; 862 if (throwExceptionCount <= maxThrowExceptionCount - 1) { 863 throw new WALEntryFilterRetryableException("failing filter"); 864 } 865 return entry; 866 } 867 868 public int getThrowExceptionCount() { 869 return throwExceptionCount; 870 } 871 } 872 873}