001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertSame;
026import static org.junit.Assert.assertThrows;
027import static org.junit.Assert.assertTrue;
028import static org.mockito.Mockito.doNothing;
029import static org.mockito.Mockito.mock;
030import static org.mockito.Mockito.when;
031
032import java.io.ByteArrayOutputStream;
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.HashMap;
037import java.util.Map;
038import java.util.NavigableMap;
039import java.util.OptionalLong;
040import java.util.TreeMap;
041import java.util.UUID;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ForkJoinPool;
044import java.util.concurrent.Future;
045import java.util.concurrent.PriorityBlockingQueue;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.concurrent.atomic.AtomicInteger;
048import java.util.concurrent.atomic.AtomicLong;
049import org.apache.commons.io.IOUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.FSDataInputStream;
052import org.apache.hadoop.fs.FSDataOutputStream;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.hbase.Cell;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.Server;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.Waiter;
060import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
061import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
062import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
063import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
064import org.apache.hadoop.hbase.replication.WALEntryFilter;
065import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
069import org.apache.hadoop.hbase.wal.WAL;
070import org.apache.hadoop.hbase.wal.WALEdit;
071import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
072import org.apache.hadoop.hbase.wal.WALFactory;
073import org.apache.hadoop.hbase.wal.WALKeyImpl;
074import org.apache.hadoop.hbase.wal.WALProvider;
075import org.junit.Assert;
076import org.junit.Before;
077import org.junit.Test;
078import org.junit.runners.Parameterized.Parameter;
079import org.junit.runners.Parameterized.Parameters;
080import org.mockito.Mockito;
081
082import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
084
085public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
086
087  @Parameter
088  public boolean isCompressionEnabled;
089
090  @Parameters(name = "{index}: isCompressionEnabled={0}")
091  public static Iterable<Object[]> data() {
092    return Arrays.asList(new Object[] { false }, new Object[] { true });
093  }
094
095  @Before
096  public void setUp() throws Exception {
097    CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
098    initWAL();
099  }
100
101  private WAL.Entry next(WALEntryStream entryStream) {
102    assertEquals(HasNext.YES, entryStream.hasNext());
103    return entryStream.next();
104  }
105
106  /**
107   * Tests basic reading of log appends
108   */
109  @Test
110  public void testAppendsWithRolls() throws Exception {
111    appendToLogAndSync();
112    long oldPos;
113    try (WALEntryStream entryStream =
114      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
115      // There's one edit in the log, read it. Reading past it needs to throw exception
116      assertEquals(HasNext.YES, entryStream.hasNext());
117      WAL.Entry entry = entryStream.peek();
118      assertSame(entry, entryStream.next());
119      assertNotNull(entry);
120      assertEquals(HasNext.RETRY, entryStream.hasNext());
121      assertNull(entryStream.peek());
122      assertThrows(IllegalStateException.class, () -> entryStream.next());
123      oldPos = entryStream.getPosition();
124    }
125
126    appendToLogAndSync();
127
128    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, oldPos, log,
129      new MetricsSource("1"), fakeWalGroupId)) {
130      // Read the newly added entry, make sure we made progress
131      WAL.Entry entry = next(entryStream);
132      assertNotEquals(oldPos, entryStream.getPosition());
133      assertNotNull(entry);
134      oldPos = entryStream.getPosition();
135    }
136
137    // We rolled but we still should see the end of the first log and get that item
138    appendToLogAndSync();
139    log.rollWriter();
140    appendToLogAndSync();
141
142    try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF,
143      oldPos, log, new MetricsSource("1"), fakeWalGroupId)) {
144      WAL.Entry entry = next(entryStream);
145      assertNotEquals(oldPos, entryStream.getPosition());
146      assertNotNull(entry);
147
148      // next item should come from the new log
149      entry = next(entryStream);
150      assertNotEquals(oldPos, entryStream.getPosition());
151      assertNotNull(entry);
152
153      // no more entries to read, disable retry otherwise we will get a wait too much time error
154      entryStream.disableRetry();
155      assertEquals(HasNext.RETRY, entryStream.hasNext());
156      oldPos = entryStream.getPosition();
157    }
158  }
159
160  /**
161   * Tests that if after a stream is opened, more entries come in and then the log is rolled, we
162   * don't mistakenly dequeue the current log thinking we're done with it
163   */
164  @Test
165  public void testLogRollWhileStreaming() throws Exception {
166    appendToLog("1");
167    // 2
168    appendToLog("2");
169    try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF,
170      0, log, new MetricsSource("1"), fakeWalGroupId)) {
171      assertEquals("1", getRow(next(entryStream)));
172
173      // 3 - comes in after reader opened
174      appendToLog("3");
175      // log roll happening while we're reading
176      log.rollWriter();
177      // 4 - this append is in the rolled log
178      appendToLog("4");
179
180      assertEquals("2", getRow(next(entryStream)));
181      // we should not have dequeued yet since there's still an entry in first log
182      assertEquals(2, getQueue().size());
183      // if implemented improperly, this would be 4 and 3 would be skipped
184      assertEquals("3", getRow(next(entryStream)));
185      // 4
186      assertEquals("4", getRow(next(entryStream)));
187      // now we've dequeued and moved on to next log properly
188      assertEquals(1, getQueue().size());
189
190      // disable so we can get the return value immediately, otherwise we will fail with wait too
191      // much time...
192      entryStream.disableRetry();
193      assertEquals(HasNext.RETRY, entryStream.hasNext());
194    }
195  }
196
197  /**
198   * Tests that if writes come in while we have a stream open, we shouldn't miss them
199   */
200
201  @Test
202  public void testNewEntriesWhileStreaming() throws Exception {
203    appendToLog("1");
204    try (WALEntryStream entryStream =
205      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
206      assertNotNull(next(entryStream)); // we've hit the end of the stream at this point
207
208      // some new entries come in while we're streaming
209      appendToLog("2");
210      appendToLog("3");
211
212      // don't see them
213      assertEquals(HasNext.RETRY, entryStream.hasNext());
214
215      // But we do if we retry next time, as the entryStream will reset the reader
216      assertEquals("2", getRow(next(entryStream)));
217      assertEquals("3", getRow(next(entryStream)));
218      // reached the end again
219      assertEquals(HasNext.RETRY, entryStream.hasNext());
220    }
221  }
222
223  @Test
224  public void testResumeStreamingFromPosition() throws Exception {
225    long lastPosition = 0;
226    appendToLog("1");
227    try (WALEntryStream entryStream =
228      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
229      assertNotNull(next(entryStream)); // we've hit the end of the stream at this point
230      appendToLog("2");
231      appendToLog("3");
232      lastPosition = entryStream.getPosition();
233    }
234    // next stream should picks up where we left off
235    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
236      new MetricsSource("1"), fakeWalGroupId)) {
237      assertEquals("2", getRow(next(entryStream)));
238      assertEquals("3", getRow(next(entryStream)));
239      assertEquals(HasNext.RETRY, entryStream.hasNext()); // done
240      assertEquals(1, getQueue().size());
241    }
242  }
243
244  /**
245   * Tests that if we stop before hitting the end of a stream, we can continue where we left off
246   * using the last position
247   */
248  @Test
249  public void testPosition() throws Exception {
250    long lastPosition = 0;
251    appendEntriesToLogAndSync(3);
252    // read only one element
253    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
254      new MetricsSource("1"), fakeWalGroupId)) {
255      assertNotNull(next(entryStream));
256      lastPosition = entryStream.getPosition();
257    }
258    // there should still be two more entries from where we left off
259    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
260      new MetricsSource("1"), fakeWalGroupId)) {
261      assertNotNull(next(entryStream));
262      assertNotNull(next(entryStream));
263      assertEquals(HasNext.RETRY, entryStream.hasNext());
264    }
265  }
266
267  @Test
268  public void testEmptyStream() throws Exception {
269    try (WALEntryStream entryStream =
270      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
271      assertEquals(HasNext.RETRY, entryStream.hasNext());
272    }
273  }
274
275  @Test
276  public void testWALKeySerialization() throws Exception {
277    Map<String, byte[]> attributes = new HashMap<String, byte[]>();
278    attributes.put("foo", Bytes.toBytes("foo-value"));
279    attributes.put("bar", Bytes.toBytes("bar-value"));
280    WALKeyImpl key =
281      new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(),
282        new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes);
283    Assert.assertEquals(attributes, key.getExtendedAttributes());
284
285    WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
286    WALProtos.WALKey serializedKey = builder.build();
287
288    WALKeyImpl deserializedKey = new WALKeyImpl();
289    deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
290
291    // equals() only checks region name, sequence id and write time
292    Assert.assertEquals(key, deserializedKey);
293    // can't use Map.equals() because byte arrays use reference equality
294    Assert.assertEquals(key.getExtendedAttributes().keySet(),
295      deserializedKey.getExtendedAttributes().keySet());
296    for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) {
297      Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
298    }
299    Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
300  }
301
302  private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf)
303    throws IOException {
304    ReplicationSourceManager mockSourceManager = new ReplicationSourceManager(null, null, conf,
305      null, null, null, null, null, null, null, createMockGlobalMetrics());
306    Server mockServer = Mockito.mock(Server.class);
307    ReplicationSource source = Mockito.mock(ReplicationSource.class);
308    when(source.getSourceManager()).thenReturn(mockSourceManager);
309    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
310    when(source.getWALFileLengthProvider()).thenReturn(log);
311    when(source.getServer()).thenReturn(mockServer);
312    when(source.isRecovered()).thenReturn(recovered);
313    return source;
314  }
315
316  private MetricsReplicationGlobalSourceSource createMockGlobalMetrics() {
317    MetricsReplicationGlobalSourceSource globalMetrics =
318      Mockito.mock(MetricsReplicationGlobalSourceSource.class);
319    final AtomicLong bufferUsedCounter = new AtomicLong(0);
320    Mockito.doAnswer((invocationOnMock) -> {
321      bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class));
322      return null;
323    }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
324    when(globalMetrics.getWALReaderEditsBufferBytes())
325      .then(invocationOnMock -> bufferUsedCounter.get());
326    return globalMetrics;
327  }
328
329  private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf)
330    throws IOException {
331    ReplicationSource source = mockReplicationSource(recovered, conf);
332    when(source.isPeerEnabled()).thenReturn(true);
333    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
334      getDummyFilter(), source, fakeWalGroupId);
335    reader.start();
336    return reader;
337  }
338
339  private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
340    Configuration conf) throws IOException {
341    ReplicationSource source = mockReplicationSource(false, conf);
342    when(source.isPeerEnabled()).thenReturn(true);
343    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
344      getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
345    reader.start();
346    return reader;
347  }
348
349  @Test
350  public void testReplicationSourceWALReader() throws Exception {
351    appendEntriesToLogAndSync(3);
352    // get ending position
353    long position;
354    try (WALEntryStream entryStream =
355      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
356      for (int i = 0; i < 3; i++) {
357        assertNotNull(next(entryStream));
358      }
359      position = entryStream.getPosition();
360    }
361
362    // start up a reader
363    Path walPath = getQueue().peek();
364    ReplicationSourceWALReader reader = createReader(false, CONF);
365    WALEntryBatch entryBatch = reader.take();
366
367    // should've batched up our entries
368    assertNotNull(entryBatch);
369    assertEquals(3, entryBatch.getWalEntries().size());
370    assertEquals(position, entryBatch.getLastWalPosition());
371    assertEquals(walPath, entryBatch.getLastWalPath());
372    assertEquals(3, entryBatch.getNbRowKeys());
373
374    appendToLog("foo");
375    entryBatch = reader.take();
376    assertEquals(1, entryBatch.getNbEntries());
377    assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
378  }
379
380  @Test
381  public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
382    appendEntriesToLogAndSync(3);
383    // get ending position
384    long position;
385    try (WALEntryStream entryStream =
386      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
387      for (int i = 0; i < 3; i++) {
388        assertNotNull(next(entryStream));
389      }
390      position = entryStream.getPosition();
391    }
392
393    // start up a reader
394    Path walPath = getQueue().peek();
395    int numFailuresInFilter = 5;
396    ReplicationSourceWALReader reader =
397      createReaderWithBadReplicationFilter(numFailuresInFilter, CONF);
398    WALEntryBatch entryBatch = reader.take();
399    assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
400
401    // should've batched up our entries
402    assertNotNull(entryBatch);
403    assertEquals(3, entryBatch.getWalEntries().size());
404    assertEquals(position, entryBatch.getLastWalPosition());
405    assertEquals(walPath, entryBatch.getLastWalPath());
406    assertEquals(3, entryBatch.getNbRowKeys());
407  }
408
409  @Test
410  public void testReplicationSourceWALReaderRecovered() throws Exception {
411    appendEntriesToLogAndSync(10);
412    Path walPath = getQueue().peek();
413    log.rollWriter();
414    appendEntriesToLogAndSync(5);
415    log.shutdown();
416
417    Configuration conf = new Configuration(CONF);
418    conf.setInt("replication.source.nb.capacity", 10);
419
420    ReplicationSourceWALReader reader = createReader(true, conf);
421
422    WALEntryBatch batch = reader.take();
423    assertEquals(walPath, batch.getLastWalPath());
424    assertEquals(10, batch.getNbEntries());
425    assertFalse(batch.isEndOfFile());
426
427    batch = reader.take();
428    assertEquals(walPath, batch.getLastWalPath());
429    assertEquals(0, batch.getNbEntries());
430    assertTrue(batch.isEndOfFile());
431
432    walPath = getQueue().peek();
433    batch = reader.take();
434    assertEquals(walPath, batch.getLastWalPath());
435    assertEquals(5, batch.getNbEntries());
436    assertTrue(batch.isEndOfFile());
437
438    assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
439  }
440
441  // Testcase for HBASE-20206
442  @Test
443  public void testReplicationSourceWALReaderWrongPosition() throws Exception {
444    appendEntriesToLogAndSync(1);
445    Path walPath = getQueue().peek();
446    log.rollWriter();
447    appendEntriesToLogAndSync(20);
448    TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
449
450      @Override
451      public boolean evaluate() throws Exception {
452        return fs.getFileStatus(walPath).getLen() > 0
453          && ((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0;
454      }
455
456      @Override
457      public String explainFailure() throws Exception {
458        return walPath + " has not been closed yet";
459      }
460
461    });
462
463    ReplicationSourceWALReader reader = createReader(false, CONF);
464
465    WALEntryBatch entryBatch = reader.take();
466    assertEquals(walPath, entryBatch.getLastWalPath());
467
468    long walLength = fs.getFileStatus(walPath).getLen();
469    assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is "
470      + walLength, entryBatch.getLastWalPosition() <= walLength);
471    assertEquals(1, entryBatch.getNbEntries());
472    assertTrue(entryBatch.isEndOfFile());
473
474    Path walPath2 = getQueue().peek();
475    entryBatch = reader.take();
476    assertEquals(walPath2, entryBatch.getLastWalPath());
477    assertEquals(20, entryBatch.getNbEntries());
478    assertFalse(entryBatch.isEndOfFile());
479
480    log.rollWriter();
481    appendEntriesToLogAndSync(10);
482    entryBatch = reader.take();
483    assertEquals(walPath2, entryBatch.getLastWalPath());
484    assertEquals(0, entryBatch.getNbEntries());
485    assertTrue(entryBatch.isEndOfFile());
486
487    Path walPath3 = getQueue().peek();
488    entryBatch = reader.take();
489    assertEquals(walPath3, entryBatch.getLastWalPath());
490    assertEquals(10, entryBatch.getNbEntries());
491    assertFalse(entryBatch.isEndOfFile());
492  }
493
494  @Test
495  public void testReplicationSourceWALReaderDisabled()
496    throws IOException, InterruptedException, ExecutionException {
497    appendEntriesToLogAndSync(3);
498    // get ending position
499    long position;
500    try (WALEntryStream entryStream =
501      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
502      for (int i = 0; i < 3; i++) {
503        assertNotNull(next(entryStream));
504      }
505      position = entryStream.getPosition();
506    }
507
508    // start up a reader
509    Path walPath = getQueue().peek();
510    ReplicationSource source = mockReplicationSource(false, CONF);
511    AtomicInteger invokeCount = new AtomicInteger(0);
512    AtomicBoolean enabled = new AtomicBoolean(false);
513    when(source.isPeerEnabled()).then(i -> {
514      invokeCount.incrementAndGet();
515      return enabled.get();
516    });
517
518    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0,
519      getDummyFilter(), source, fakeWalGroupId);
520    reader.start();
521    Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
522      return reader.take();
523    });
524    // make sure that the isPeerEnabled has been called several times
525    TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
526    // confirm that we can read nothing if the peer is disabled
527    assertFalse(future.isDone());
528    // then enable the peer, we should get the batch
529    enabled.set(true);
530    WALEntryBatch entryBatch = future.get();
531
532    // should've batched up our entries
533    assertNotNull(entryBatch);
534    assertEquals(3, entryBatch.getWalEntries().size());
535    assertEquals(position, entryBatch.getLastWalPosition());
536    assertEquals(walPath, entryBatch.getLastWalPath());
537    assertEquals(3, entryBatch.getNbRowKeys());
538  }
539
540  private String getRow(WAL.Entry entry) {
541    Cell cell = entry.getEdit().getCells().get(0);
542    return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
543  }
544
545  private void appendToLog(String key) throws IOException {
546    final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
547      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key));
548    log.sync(txid);
549  }
550
551  private void appendEntriesToLogAndSync(int count) throws IOException {
552    long txid = -1L;
553    for (int i = 0; i < count; i++) {
554      txid = appendToLog(1);
555    }
556    log.sync(txid);
557  }
558
559  private WALEdit getWALEdit(String row) {
560    WALEdit edit = new WALEdit();
561    WALEditInternalHelper.addExtendedCell(edit, new KeyValue(Bytes.toBytes(row), family, qualifier,
562      EnvironmentEdgeManager.currentTime(), qualifier));
563    return edit;
564  }
565
566  private WALEntryFilter getDummyFilter() {
567    return new WALEntryFilter() {
568
569      @Override
570      public WAL.Entry filter(WAL.Entry entry) {
571        return entry;
572      }
573    };
574  }
575
576  private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) {
577    return new FailingWALEntryFilter(numFailuresInFilter);
578  }
579
580  public static class FailingWALEntryFilter implements WALEntryFilter {
581    private int numFailures = 0;
582    private static int countFailures = 0;
583
584    public FailingWALEntryFilter(int numFailuresInFilter) {
585      numFailures = numFailuresInFilter;
586    }
587
588    @Override
589    public WAL.Entry filter(WAL.Entry entry) {
590      if (countFailures == numFailures) {
591        return entry;
592      }
593      countFailures = countFailures + 1;
594      throw new WALEntryFilterRetryableException("failing filter");
595    }
596
597    public static int numFailures() {
598      return countFailures;
599    }
600  }
601
602  @Test
603  public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
604    appendToLog("1");
605    appendToLog("2");
606    long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
607    AtomicLong fileLength = new AtomicLong(size - 1);
608    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, 0,
609      p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"), fakeWalGroupId)) {
610      assertNotNull(next(entryStream));
611      // can not get log 2
612      assertEquals(HasNext.RETRY, entryStream.hasNext());
613      Thread.sleep(1000);
614      // still can not get log 2
615      assertEquals(HasNext.RETRY, entryStream.hasNext());
616
617      // can get log 2 now
618      fileLength.set(size);
619      assertNotNull(next(entryStream));
620
621      assertEquals(HasNext.RETRY, entryStream.hasNext());
622    }
623  }
624
625  /**
626   * Test removal of 0 length log from logQueue if the source is a recovered source and size of
627   * logQueue is only 1.
628   */
629  @Test
630  public void testEOFExceptionForRecoveredQueue() throws Exception {
631    // Create a 0 length log.
632    Path emptyLog = new Path("emptyLog");
633    FSDataOutputStream fsdos = fs.create(emptyLog);
634    fsdos.close();
635    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
636
637    Configuration conf = new Configuration(CONF);
638    // Override the max retries multiplier to fail fast.
639    conf.setInt("replication.source.maxretriesmultiplier", 1);
640    conf.setBoolean("replication.source.eof.autorecovery", true);
641    conf.setInt("replication.source.nb.batches", 10);
642    // Create a reader thread with source as recovered source.
643    ReplicationSource source = mockReplicationSource(true, conf);
644    when(source.isPeerEnabled()).thenReturn(true);
645
646    MetricsSource metrics = mock(MetricsSource.class);
647    doNothing().when(metrics).incrSizeOfLogQueue();
648    doNothing().when(metrics).decrSizeOfLogQueue();
649    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
650    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
651    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
652      getDummyFilter(), source, fakeWalGroupId);
653    reader.start();
654    reader.join();
655    // ReplicationSourceWALReaderThread#handleEofException method will
656    // remove empty log from logQueue.
657    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
658  }
659
660  @Test
661  public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
662    Configuration conf = new Configuration(CONF);
663    MetricsSource metrics = mock(MetricsSource.class);
664    ReplicationSource source = mockReplicationSource(true, conf);
665    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
666    // Create a 0 length log.
667    Path emptyLog = new Path(fs.getHomeDirectory(), "log.2." + isCompressionEnabled);
668    fs.create(emptyLog).close();
669    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
670    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
671
672    final Path log1 = new Path(fs.getHomeDirectory(), "log.1." + isCompressionEnabled);
673    WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
674    appendEntries(writer1, 3);
675    localLogQueue.enqueueLog(log1, fakeWalGroupId);
676
677    when(source.isPeerEnabled()).thenReturn(true);
678    // Override the max retries multiplier to fail fast.
679    conf.setInt("replication.source.maxretriesmultiplier", 1);
680    conf.setBoolean("replication.source.eof.autorecovery", true);
681    conf.setInt("replication.source.nb.batches", 10);
682    // Create a reader thread.
683    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
684      getDummyFilter(), source, fakeWalGroupId);
685    assertEquals("Initial log queue size is not correct", 2,
686      localLogQueue.getQueueSize(fakeWalGroupId));
687    reader.start();
688    reader.join();
689
690    // remove empty log from logQueue.
691    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
692    assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
693  }
694
695  private PriorityBlockingQueue<Path> getQueue() {
696    return logQueue.getQueue(fakeWalGroupId);
697  }
698
699  private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
700    for (int i = 0; i < numEntries; i++) {
701      byte[] b = Bytes.toBytes(Integer.toString(i));
702      KeyValue kv = new KeyValue(b, b, b);
703      WALEdit edit = new WALEdit();
704      WALEditInternalHelper.addExtendedCell(edit, kv);
705      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
706      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
707      scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
708      writer.append(new WAL.Entry(key, edit));
709      writer.sync(false);
710    }
711    writer.close();
712  }
713
714  /***
715   * Tests size of log queue is incremented and decremented properly.
716   */
717  @Test
718  public void testSizeOfLogQueue() throws Exception {
719    // There should be always 1 log which is current wal.
720    assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
721    appendToLogAndSync();
722
723    log.rollWriter();
724    // wait until the previous WAL file is cleanly closed, so later we can aleays see
725    // RETRY_IMMEDIATELY instead of RETRY. The wait here is necessary because the closing of a WAL
726    // writer is asynchronouns
727    TEST_UTIL.waitFor(30000, () -> fs.getClient().isFileClosed(logQueue.getQueue(fakeWalGroupId)
728      .peek().makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri().getPath()));
729    // After rolling there will be 2 wals in the queue
730    assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
731
732    try (WALEntryStream entryStream =
733      new WALEntryStream(logQueue, fs, CONF, 0, log, logQueue.getMetrics(), fakeWalGroupId)) {
734      // There's one edit in the log, read it.
735      assertNotNull(next(entryStream));
736      // we've switched to the next WAL, and the previous WAL file is closed cleanly, so it is
737      // RETRY_IMMEDIATELY
738      assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext());
739    }
740    // After removing one wal, size of log queue will be 1 again.
741    assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
742  }
743
744  /**
745   * Tests that wals are closed cleanly and we read the trailer when we remove wal from
746   * WALEntryStream.
747   */
748  @Test
749  public void testCleanClosedWALs() throws Exception {
750    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 0, log,
751      logQueue.getMetrics(), fakeWalGroupId)) {
752      assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
753      appendToLogAndSync();
754      assertNotNull(next(entryStream));
755      log.rollWriter();
756      appendToLogAndSync();
757      assertNotNull(next(entryStream));
758      assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
759    }
760  }
761
762  /**
763   * Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
764   */
765  @Test
766  public void testEOFExceptionInOldWALsDirectory() throws Exception {
767    assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
768    AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
769    Path emptyLogFile = abstractWAL.getCurrentFileName();
770    log.rollWriter(true);
771
772    // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
773    // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
774    // oldWALs directory.
775    Waiter.waitFor(CONF, 5000,
776      (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
777    // There will 2 logs in the queue.
778    assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
779
780    // Get the archived dir path for the first wal.
781    Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
782    // Make sure that the wal path is not the same as archived Dir path.
783    assertNotNull(archivePath);
784    assertTrue(fs.exists(archivePath));
785    fs.truncate(archivePath, 0);
786    // make sure the size of the wal file is 0.
787    assertEquals(0, fs.getFileStatus(archivePath).getLen());
788
789    ReplicationSource source = Mockito.mock(ReplicationSource.class);
790    when(source.isPeerEnabled()).thenReturn(true);
791
792    Configuration localConf = new Configuration(CONF);
793    localConf.setInt("replication.source.maxretriesmultiplier", 1);
794    localConf.setBoolean("replication.source.eof.autorecovery", true);
795    // Start the reader thread.
796    createReader(false, localConf);
797    // Wait for the replication queue size to be 1. This means that we have handled
798    // 0 length wal from oldWALs directory.
799    Waiter.waitFor(localConf, 10000,
800      (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
801  }
802
803  /**
804   * This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some
805   * entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be
806   * decreased because {@link WALEntryBatch} is not put to
807   * {@link ReplicationSourceWALReader#entryBatchQueue}.
808   */
809  @Test
810  public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
811    appendEntriesToLogAndSync(3);
812    // get ending position
813    long position;
814    try (WALEntryStream entryStream =
815      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
816      for (int i = 0; i < 3; i++) {
817        assertNotNull(next(entryStream));
818      }
819      position = entryStream.getPosition();
820    }
821
822    Path walPath = getQueue().peek();
823    int maxThrowExceptionCount = 3;
824
825    ReplicationSource source = mockReplicationSource(false, CONF);
826    when(source.isPeerEnabled()).thenReturn(true);
827    PartialWALEntryFailingWALEntryFilter walEntryFilter =
828      new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
829    ReplicationSourceWALReader reader =
830      new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId);
831    reader.start();
832    WALEntryBatch entryBatch = reader.take();
833
834    assertNotNull(entryBatch);
835    assertEquals(3, entryBatch.getWalEntries().size());
836    long sum = entryBatch.getWalEntries().stream()
837      .mapToLong(WALEntryBatch::getEntrySizeExcludeBulkLoad).sum();
838    assertEquals(position, entryBatch.getLastWalPosition());
839    assertEquals(walPath, entryBatch.getLastWalPath());
840    assertEquals(3, entryBatch.getNbRowKeys());
841    assertEquals(sum, source.getSourceManager().getTotalBufferUsed());
842    assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
843    assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
844    assertNull(reader.poll(10));
845  }
846
847  // testcase for HBASE-28748
848  @Test
849  public void testWALEntryStreamEOFRightAfterHeader() throws Exception {
850    assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
851    AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
852    Path emptyLogFile = abstractWAL.getCurrentFileName();
853    log.rollWriter(true);
854
855    // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
856    // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
857    // oldWALs directory.
858    Waiter.waitFor(CONF, 5000,
859      (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
860    // There will 2 logs in the queue.
861    assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
862    appendToLogAndSync();
863
864    Path archivedEmptyLogFile = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
865
866    // read the wal header
867    ByteArrayOutputStream bos = new ByteArrayOutputStream();
868    bos.write(AbstractProtobufWALReader.PB_WAL_MAGIC);
869    try (FSDataInputStream in = fs.open(archivedEmptyLogFile)) {
870      IOUtils.skipFully(in, AbstractProtobufWALReader.PB_WAL_MAGIC.length);
871      WALHeader header = WALHeader.parseDelimitedFrom(in);
872      header.writeDelimitedTo(bos);
873    }
874    // truncate the first empty log so we have an incomplete header
875    try (FSDataOutputStream out = fs.create(archivedEmptyLogFile, true)) {
876      bos.writeTo(out);
877    }
878    try (WALEntryStream entryStream =
879      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
880      assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext());
881      assertNotNull(next(entryStream));
882    }
883  }
884
885  private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
886    private int filteredWALEntryCount = -1;
887    private int walEntryCount = 0;
888    private int throwExceptionCount = -1;
889    private int maxThrowExceptionCount;
890
891    public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) {
892      this.maxThrowExceptionCount = throwExceptionLimit;
893      this.walEntryCount = walEntryCount;
894    }
895
896    @Override
897    public WAL.Entry filter(WAL.Entry entry) {
898      filteredWALEntryCount++;
899      if (filteredWALEntryCount < walEntryCount - 1) {
900        return entry;
901      }
902
903      filteredWALEntryCount = -1;
904      throwExceptionCount++;
905      if (throwExceptionCount <= maxThrowExceptionCount - 1) {
906        throw new WALEntryFilterRetryableException("failing filter");
907      }
908      return entry;
909    }
910
911    public int getThrowExceptionCount() {
912      return throwExceptionCount;
913    }
914  }
915
916}