001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertSame;
026import static org.junit.Assert.assertThrows;
027import static org.junit.Assert.assertTrue;
028import static org.mockito.Mockito.doNothing;
029import static org.mockito.Mockito.mock;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.HashMap;
036import java.util.Map;
037import java.util.NavigableMap;
038import java.util.OptionalLong;
039import java.util.TreeMap;
040import java.util.UUID;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ForkJoinPool;
043import java.util.concurrent.Future;
044import java.util.concurrent.PriorityBlockingQueue;
045import java.util.concurrent.atomic.AtomicBoolean;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.atomic.AtomicLong;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FSDataOutputStream;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.KeyValue;
054import org.apache.hadoop.hbase.Server;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.Waiter;
057import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
058import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
059import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
060import org.apache.hadoop.hbase.replication.WALEntryFilter;
061import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
065import org.apache.hadoop.hbase.wal.WAL;
066import org.apache.hadoop.hbase.wal.WAL.Entry;
067import org.apache.hadoop.hbase.wal.WALEdit;
068import org.apache.hadoop.hbase.wal.WALFactory;
069import org.apache.hadoop.hbase.wal.WALKeyImpl;
070import org.apache.hadoop.hbase.wal.WALProvider;
071import org.junit.Assert;
072import org.junit.Before;
073import org.junit.Test;
074import org.junit.runners.Parameterized.Parameter;
075import org.junit.runners.Parameterized.Parameters;
076import org.mockito.Mockito;
077
078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
079
080public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
081
082  @Parameter
083  public boolean isCompressionEnabled;
084
085  @Parameters(name = "{index}: isCompressionEnabled={0}")
086  public static Iterable<Object[]> data() {
087    return Arrays.asList(new Object[] { false }, new Object[] { true });
088  }
089
090  @Before
091  public void setUp() throws Exception {
092    CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
093    initWAL();
094  }
095
096  private Entry next(WALEntryStream entryStream) {
097    assertEquals(HasNext.YES, entryStream.hasNext());
098    return entryStream.next();
099  }
100
101  /**
102   * Tests basic reading of log appends
103   */
104  @Test
105  public void testAppendsWithRolls() throws Exception {
106    appendToLogAndSync();
107    long oldPos;
108    try (WALEntryStream entryStream =
109      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
110      // There's one edit in the log, read it. Reading past it needs to throw exception
111      assertEquals(HasNext.YES, entryStream.hasNext());
112      WAL.Entry entry = entryStream.peek();
113      assertSame(entry, entryStream.next());
114      assertNotNull(entry);
115      assertEquals(HasNext.RETRY, entryStream.hasNext());
116      assertNull(entryStream.peek());
117      assertThrows(IllegalStateException.class, () -> entryStream.next());
118      oldPos = entryStream.getPosition();
119    }
120
121    appendToLogAndSync();
122
123    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, oldPos, log,
124      new MetricsSource("1"), fakeWalGroupId)) {
125      // Read the newly added entry, make sure we made progress
126      WAL.Entry entry = next(entryStream);
127      assertNotEquals(oldPos, entryStream.getPosition());
128      assertNotNull(entry);
129      oldPos = entryStream.getPosition();
130    }
131
132    // We rolled but we still should see the end of the first log and get that item
133    appendToLogAndSync();
134    log.rollWriter();
135    appendToLogAndSync();
136
137    try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF,
138      oldPos, log, new MetricsSource("1"), fakeWalGroupId)) {
139      WAL.Entry entry = next(entryStream);
140      assertNotEquals(oldPos, entryStream.getPosition());
141      assertNotNull(entry);
142
143      // next item should come from the new log
144      entry = next(entryStream);
145      assertNotEquals(oldPos, entryStream.getPosition());
146      assertNotNull(entry);
147
148      // no more entries to read, disable retry otherwise we will get a wait too much time error
149      entryStream.disableRetry();
150      assertEquals(HasNext.RETRY, entryStream.hasNext());
151      oldPos = entryStream.getPosition();
152    }
153  }
154
155  /**
156   * Tests that if after a stream is opened, more entries come in and then the log is rolled, we
157   * don't mistakenly dequeue the current log thinking we're done with it
158   */
159  @Test
160  public void testLogRollWhileStreaming() throws Exception {
161    appendToLog("1");
162    // 2
163    appendToLog("2");
164    try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF,
165      0, log, new MetricsSource("1"), fakeWalGroupId)) {
166      assertEquals("1", getRow(next(entryStream)));
167
168      // 3 - comes in after reader opened
169      appendToLog("3");
170      // log roll happening while we're reading
171      log.rollWriter();
172      // 4 - this append is in the rolled log
173      appendToLog("4");
174
175      assertEquals("2", getRow(next(entryStream)));
176      // we should not have dequeued yet since there's still an entry in first log
177      assertEquals(2, getQueue().size());
178      // if implemented improperly, this would be 4 and 3 would be skipped
179      assertEquals("3", getRow(next(entryStream)));
180      // 4
181      assertEquals("4", getRow(next(entryStream)));
182      // now we've dequeued and moved on to next log properly
183      assertEquals(1, getQueue().size());
184
185      // disable so we can get the return value immediately, otherwise we will fail with wait too
186      // much time...
187      entryStream.disableRetry();
188      assertEquals(HasNext.RETRY, entryStream.hasNext());
189    }
190  }
191
192  /**
193   * Tests that if writes come in while we have a stream open, we shouldn't miss them
194   */
195
196  @Test
197  public void testNewEntriesWhileStreaming() throws Exception {
198    appendToLog("1");
199    try (WALEntryStream entryStream =
200      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
201      assertNotNull(next(entryStream)); // we've hit the end of the stream at this point
202
203      // some new entries come in while we're streaming
204      appendToLog("2");
205      appendToLog("3");
206
207      // don't see them
208      assertEquals(HasNext.RETRY, entryStream.hasNext());
209
210      // But we do if we retry next time, as the entryStream will reset the reader
211      assertEquals("2", getRow(next(entryStream)));
212      assertEquals("3", getRow(next(entryStream)));
213      // reached the end again
214      assertEquals(HasNext.RETRY, entryStream.hasNext());
215    }
216  }
217
218  @Test
219  public void testResumeStreamingFromPosition() throws Exception {
220    long lastPosition = 0;
221    appendToLog("1");
222    try (WALEntryStream entryStream =
223      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
224      assertNotNull(next(entryStream)); // we've hit the end of the stream at this point
225      appendToLog("2");
226      appendToLog("3");
227      lastPosition = entryStream.getPosition();
228    }
229    // next stream should picks up where we left off
230    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
231      new MetricsSource("1"), fakeWalGroupId)) {
232      assertEquals("2", getRow(next(entryStream)));
233      assertEquals("3", getRow(next(entryStream)));
234      assertEquals(HasNext.RETRY, entryStream.hasNext()); // done
235      assertEquals(1, getQueue().size());
236    }
237  }
238
239  /**
240   * Tests that if we stop before hitting the end of a stream, we can continue where we left off
241   * using the last position
242   */
243  @Test
244  public void testPosition() throws Exception {
245    long lastPosition = 0;
246    appendEntriesToLogAndSync(3);
247    // read only one element
248    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
249      new MetricsSource("1"), fakeWalGroupId)) {
250      assertNotNull(next(entryStream));
251      lastPosition = entryStream.getPosition();
252    }
253    // there should still be two more entries from where we left off
254    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
255      new MetricsSource("1"), fakeWalGroupId)) {
256      assertNotNull(next(entryStream));
257      assertNotNull(next(entryStream));
258      assertEquals(HasNext.RETRY, entryStream.hasNext());
259    }
260  }
261
262  @Test
263  public void testEmptyStream() throws Exception {
264    try (WALEntryStream entryStream =
265      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
266      assertEquals(HasNext.RETRY, entryStream.hasNext());
267    }
268  }
269
270  @Test
271  public void testWALKeySerialization() throws Exception {
272    Map<String, byte[]> attributes = new HashMap<String, byte[]>();
273    attributes.put("foo", Bytes.toBytes("foo-value"));
274    attributes.put("bar", Bytes.toBytes("bar-value"));
275    WALKeyImpl key =
276      new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(),
277        new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes);
278    Assert.assertEquals(attributes, key.getExtendedAttributes());
279
280    WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
281    WALProtos.WALKey serializedKey = builder.build();
282
283    WALKeyImpl deserializedKey = new WALKeyImpl();
284    deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
285
286    // equals() only checks region name, sequence id and write time
287    Assert.assertEquals(key, deserializedKey);
288    // can't use Map.equals() because byte arrays use reference equality
289    Assert.assertEquals(key.getExtendedAttributes().keySet(),
290      deserializedKey.getExtendedAttributes().keySet());
291    for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) {
292      Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
293    }
294    Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
295  }
296
297  private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf)
298    throws IOException {
299    ReplicationSourceManager mockSourceManager = new ReplicationSourceManager(null, null, conf,
300      null, null, null, null, null, null, createMockGlobalMetrics());
301    Server mockServer = Mockito.mock(Server.class);
302    ReplicationSource source = Mockito.mock(ReplicationSource.class);
303    when(source.getSourceManager()).thenReturn(mockSourceManager);
304    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
305    when(source.getWALFileLengthProvider()).thenReturn(log);
306    when(source.getServer()).thenReturn(mockServer);
307    when(source.isRecovered()).thenReturn(recovered);
308    return source;
309  }
310
311  private MetricsReplicationGlobalSourceSource createMockGlobalMetrics() {
312    MetricsReplicationGlobalSourceSource globalMetrics =
313      Mockito.mock(MetricsReplicationGlobalSourceSource.class);
314    final AtomicLong bufferUsedCounter = new AtomicLong(0);
315    Mockito.doAnswer((invocationOnMock) -> {
316      bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class));
317      return null;
318    }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
319    when(globalMetrics.getWALReaderEditsBufferBytes())
320      .then(invocationOnMock -> bufferUsedCounter.get());
321    return globalMetrics;
322  }
323
324  private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf)
325    throws IOException {
326    ReplicationSource source = mockReplicationSource(recovered, conf);
327    when(source.isPeerEnabled()).thenReturn(true);
328    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
329      getDummyFilter(), source, fakeWalGroupId);
330    reader.start();
331    return reader;
332  }
333
334  private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
335    Configuration conf) throws IOException {
336    ReplicationSource source = mockReplicationSource(false, conf);
337    when(source.isPeerEnabled()).thenReturn(true);
338    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
339      getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
340    reader.start();
341    return reader;
342  }
343
344  @Test
345  public void testReplicationSourceWALReader() throws Exception {
346    appendEntriesToLogAndSync(3);
347    // get ending position
348    long position;
349    try (WALEntryStream entryStream =
350      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
351      for (int i = 0; i < 3; i++) {
352        assertNotNull(next(entryStream));
353      }
354      position = entryStream.getPosition();
355    }
356
357    // start up a reader
358    Path walPath = getQueue().peek();
359    ReplicationSourceWALReader reader = createReader(false, CONF);
360    WALEntryBatch entryBatch = reader.take();
361
362    // should've batched up our entries
363    assertNotNull(entryBatch);
364    assertEquals(3, entryBatch.getWalEntries().size());
365    assertEquals(position, entryBatch.getLastWalPosition());
366    assertEquals(walPath, entryBatch.getLastWalPath());
367    assertEquals(3, entryBatch.getNbRowKeys());
368
369    appendToLog("foo");
370    entryBatch = reader.take();
371    assertEquals(1, entryBatch.getNbEntries());
372    assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
373  }
374
375  @Test
376  public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
377    appendEntriesToLogAndSync(3);
378    // get ending position
379    long position;
380    try (WALEntryStream entryStream =
381      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
382      for (int i = 0; i < 3; i++) {
383        assertNotNull(next(entryStream));
384      }
385      position = entryStream.getPosition();
386    }
387
388    // start up a reader
389    Path walPath = getQueue().peek();
390    int numFailuresInFilter = 5;
391    ReplicationSourceWALReader reader =
392      createReaderWithBadReplicationFilter(numFailuresInFilter, CONF);
393    WALEntryBatch entryBatch = reader.take();
394    assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
395
396    // should've batched up our entries
397    assertNotNull(entryBatch);
398    assertEquals(3, entryBatch.getWalEntries().size());
399    assertEquals(position, entryBatch.getLastWalPosition());
400    assertEquals(walPath, entryBatch.getLastWalPath());
401    assertEquals(3, entryBatch.getNbRowKeys());
402  }
403
404  @Test
405  public void testReplicationSourceWALReaderRecovered() throws Exception {
406    appendEntriesToLogAndSync(10);
407    Path walPath = getQueue().peek();
408    log.rollWriter();
409    appendEntriesToLogAndSync(5);
410    log.shutdown();
411
412    Configuration conf = new Configuration(CONF);
413    conf.setInt("replication.source.nb.capacity", 10);
414
415    ReplicationSourceWALReader reader = createReader(true, conf);
416
417    WALEntryBatch batch = reader.take();
418    assertEquals(walPath, batch.getLastWalPath());
419    assertEquals(10, batch.getNbEntries());
420    assertFalse(batch.isEndOfFile());
421
422    batch = reader.take();
423    assertEquals(walPath, batch.getLastWalPath());
424    assertEquals(0, batch.getNbEntries());
425    assertTrue(batch.isEndOfFile());
426
427    walPath = getQueue().peek();
428    batch = reader.take();
429    assertEquals(walPath, batch.getLastWalPath());
430    assertEquals(5, batch.getNbEntries());
431    assertTrue(batch.isEndOfFile());
432
433    assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
434  }
435
436  // Testcase for HBASE-20206
437  @Test
438  public void testReplicationSourceWALReaderWrongPosition() throws Exception {
439    appendEntriesToLogAndSync(1);
440    Path walPath = getQueue().peek();
441    log.rollWriter();
442    appendEntriesToLogAndSync(20);
443    TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
444
445      @Override
446      public boolean evaluate() throws Exception {
447        return fs.getFileStatus(walPath).getLen() > 0
448          && ((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0;
449      }
450
451      @Override
452      public String explainFailure() throws Exception {
453        return walPath + " has not been closed yet";
454      }
455
456    });
457
458    ReplicationSourceWALReader reader = createReader(false, CONF);
459
460    WALEntryBatch entryBatch = reader.take();
461    assertEquals(walPath, entryBatch.getLastWalPath());
462
463    long walLength = fs.getFileStatus(walPath).getLen();
464    assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is "
465      + walLength, entryBatch.getLastWalPosition() <= walLength);
466    assertEquals(1, entryBatch.getNbEntries());
467    assertTrue(entryBatch.isEndOfFile());
468
469    Path walPath2 = getQueue().peek();
470    entryBatch = reader.take();
471    assertEquals(walPath2, entryBatch.getLastWalPath());
472    assertEquals(20, entryBatch.getNbEntries());
473    assertFalse(entryBatch.isEndOfFile());
474
475    log.rollWriter();
476    appendEntriesToLogAndSync(10);
477    entryBatch = reader.take();
478    assertEquals(walPath2, entryBatch.getLastWalPath());
479    assertEquals(0, entryBatch.getNbEntries());
480    assertTrue(entryBatch.isEndOfFile());
481
482    Path walPath3 = getQueue().peek();
483    entryBatch = reader.take();
484    assertEquals(walPath3, entryBatch.getLastWalPath());
485    assertEquals(10, entryBatch.getNbEntries());
486    assertFalse(entryBatch.isEndOfFile());
487  }
488
489  @Test
490  public void testReplicationSourceWALReaderDisabled()
491    throws IOException, InterruptedException, ExecutionException {
492    appendEntriesToLogAndSync(3);
493    // get ending position
494    long position;
495    try (WALEntryStream entryStream =
496      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
497      for (int i = 0; i < 3; i++) {
498        assertNotNull(next(entryStream));
499      }
500      position = entryStream.getPosition();
501    }
502
503    // start up a reader
504    Path walPath = getQueue().peek();
505    ReplicationSource source = mockReplicationSource(false, CONF);
506    AtomicInteger invokeCount = new AtomicInteger(0);
507    AtomicBoolean enabled = new AtomicBoolean(false);
508    when(source.isPeerEnabled()).then(i -> {
509      invokeCount.incrementAndGet();
510      return enabled.get();
511    });
512
513    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0,
514      getDummyFilter(), source, fakeWalGroupId);
515    reader.start();
516    Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
517      return reader.take();
518    });
519    // make sure that the isPeerEnabled has been called several times
520    TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
521    // confirm that we can read nothing if the peer is disabled
522    assertFalse(future.isDone());
523    // then enable the peer, we should get the batch
524    enabled.set(true);
525    WALEntryBatch entryBatch = future.get();
526
527    // should've batched up our entries
528    assertNotNull(entryBatch);
529    assertEquals(3, entryBatch.getWalEntries().size());
530    assertEquals(position, entryBatch.getLastWalPosition());
531    assertEquals(walPath, entryBatch.getLastWalPath());
532    assertEquals(3, entryBatch.getNbRowKeys());
533  }
534
535  private String getRow(WAL.Entry entry) {
536    Cell cell = entry.getEdit().getCells().get(0);
537    return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
538  }
539
540  private void appendToLog(String key) throws IOException {
541    final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
542      EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key));
543    log.sync(txid);
544  }
545
546  private void appendEntriesToLogAndSync(int count) throws IOException {
547    long txid = -1L;
548    for (int i = 0; i < count; i++) {
549      txid = appendToLog(1);
550    }
551    log.sync(txid);
552  }
553
554  private WALEdit getWALEdit(String row) {
555    WALEdit edit = new WALEdit();
556    edit.add(new KeyValue(Bytes.toBytes(row), family, qualifier,
557      EnvironmentEdgeManager.currentTime(), qualifier));
558    return edit;
559  }
560
561  private WALEntryFilter getDummyFilter() {
562    return new WALEntryFilter() {
563
564      @Override
565      public Entry filter(Entry entry) {
566        return entry;
567      }
568    };
569  }
570
571  private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) {
572    return new FailingWALEntryFilter(numFailuresInFilter);
573  }
574
575  public static class FailingWALEntryFilter implements WALEntryFilter {
576    private int numFailures = 0;
577    private static int countFailures = 0;
578
579    public FailingWALEntryFilter(int numFailuresInFilter) {
580      numFailures = numFailuresInFilter;
581    }
582
583    @Override
584    public Entry filter(Entry entry) {
585      if (countFailures == numFailures) {
586        return entry;
587      }
588      countFailures = countFailures + 1;
589      throw new WALEntryFilterRetryableException("failing filter");
590    }
591
592    public static int numFailures() {
593      return countFailures;
594    }
595  }
596
597  @Test
598  public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
599    appendToLog("1");
600    appendToLog("2");
601    long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
602    AtomicLong fileLength = new AtomicLong(size - 1);
603    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, 0,
604      p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"), fakeWalGroupId)) {
605      assertNotNull(next(entryStream));
606      // can not get log 2
607      assertEquals(HasNext.RETRY, entryStream.hasNext());
608      Thread.sleep(1000);
609      // still can not get log 2
610      assertEquals(HasNext.RETRY, entryStream.hasNext());
611
612      // can get log 2 now
613      fileLength.set(size);
614      assertNotNull(next(entryStream));
615
616      assertEquals(HasNext.RETRY, entryStream.hasNext());
617    }
618  }
619
620  /**
621   * Test removal of 0 length log from logQueue if the source is a recovered source and size of
622   * logQueue is only 1.
623   */
624  @Test
625  public void testEOFExceptionForRecoveredQueue() throws Exception {
626    // Create a 0 length log.
627    Path emptyLog = new Path("emptyLog");
628    FSDataOutputStream fsdos = fs.create(emptyLog);
629    fsdos.close();
630    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
631
632    Configuration conf = new Configuration(CONF);
633    // Override the max retries multiplier to fail fast.
634    conf.setInt("replication.source.maxretriesmultiplier", 1);
635    conf.setBoolean("replication.source.eof.autorecovery", true);
636    conf.setInt("replication.source.nb.batches", 10);
637    // Create a reader thread with source as recovered source.
638    ReplicationSource source = mockReplicationSource(true, conf);
639    when(source.isPeerEnabled()).thenReturn(true);
640
641    MetricsSource metrics = mock(MetricsSource.class);
642    doNothing().when(metrics).incrSizeOfLogQueue();
643    doNothing().when(metrics).decrSizeOfLogQueue();
644    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
645    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
646    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
647      getDummyFilter(), source, fakeWalGroupId);
648    reader.start();
649    reader.join();
650    // ReplicationSourceWALReaderThread#handleEofException method will
651    // remove empty log from logQueue.
652    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
653  }
654
655  @Test
656  public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
657    Configuration conf = new Configuration(CONF);
658    MetricsSource metrics = mock(MetricsSource.class);
659    ReplicationSource source = mockReplicationSource(true, conf);
660    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
661    // Create a 0 length log.
662    Path emptyLog = new Path(fs.getHomeDirectory(), "log.2." + isCompressionEnabled);
663    fs.create(emptyLog).close();
664    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
665    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
666
667    final Path log1 = new Path(fs.getHomeDirectory(), "log.1." + isCompressionEnabled);
668    WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
669    appendEntries(writer1, 3);
670    localLogQueue.enqueueLog(log1, fakeWalGroupId);
671
672    when(source.isPeerEnabled()).thenReturn(true);
673    // Override the max retries multiplier to fail fast.
674    conf.setInt("replication.source.maxretriesmultiplier", 1);
675    conf.setBoolean("replication.source.eof.autorecovery", true);
676    conf.setInt("replication.source.nb.batches", 10);
677    // Create a reader thread.
678    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
679      getDummyFilter(), source, fakeWalGroupId);
680    assertEquals("Initial log queue size is not correct", 2,
681      localLogQueue.getQueueSize(fakeWalGroupId));
682    reader.start();
683    reader.join();
684
685    // remove empty log from logQueue.
686    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
687    assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
688  }
689
690  private PriorityBlockingQueue<Path> getQueue() {
691    return logQueue.getQueue(fakeWalGroupId);
692  }
693
694  private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
695    for (int i = 0; i < numEntries; i++) {
696      byte[] b = Bytes.toBytes(Integer.toString(i));
697      KeyValue kv = new KeyValue(b, b, b);
698      WALEdit edit = new WALEdit();
699      edit.add(kv);
700      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
701      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
702      scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
703      writer.append(new WAL.Entry(key, edit));
704      writer.sync(false);
705    }
706    writer.close();
707  }
708
709  /***
710   * Tests size of log queue is incremented and decremented properly.
711   */
712  @Test
713  public void testSizeOfLogQueue() throws Exception {
714    // There should be always 1 log which is current wal.
715    assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
716    appendToLogAndSync();
717
718    log.rollWriter();
719    // wait until the previous WAL file is cleanly closed, so later we can aleays see
720    // RETRY_IMMEDIATELY instead of RETRY. The wait here is necessary because the closing of a WAL
721    // writer is asynchronouns
722    TEST_UTIL.waitFor(30000, () -> fs.getClient().isFileClosed(logQueue.getQueue(fakeWalGroupId)
723      .peek().makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri().getPath()));
724    // After rolling there will be 2 wals in the queue
725    assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
726
727    try (WALEntryStream entryStream =
728      new WALEntryStream(logQueue, fs, CONF, 0, log, logQueue.getMetrics(), fakeWalGroupId)) {
729      // There's one edit in the log, read it.
730      assertNotNull(next(entryStream));
731      // we've switched to the next WAL, and the previous WAL file is closed cleanly, so it is
732      // RETRY_IMMEDIATELY
733      assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext());
734    }
735    // After removing one wal, size of log queue will be 1 again.
736    assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
737  }
738
739  /**
740   * Tests that wals are closed cleanly and we read the trailer when we remove wal from
741   * WALEntryStream.
742   */
743  @Test
744  public void testCleanClosedWALs() throws Exception {
745    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 0, log,
746      logQueue.getMetrics(), fakeWalGroupId)) {
747      assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
748      appendToLogAndSync();
749      assertNotNull(next(entryStream));
750      log.rollWriter();
751      appendToLogAndSync();
752      assertNotNull(next(entryStream));
753      assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
754    }
755  }
756
757  /**
758   * Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
759   */
760  @Test
761  public void testEOFExceptionInOldWALsDirectory() throws Exception {
762    assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
763    AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
764    Path emptyLogFile = abstractWAL.getCurrentFileName();
765    log.rollWriter(true);
766
767    // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
768    // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
769    // oldWALs directory.
770    Waiter.waitFor(CONF, 5000,
771      (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
772    // There will 2 logs in the queue.
773    assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
774
775    // Get the archived dir path for the first wal.
776    Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
777    // Make sure that the wal path is not the same as archived Dir path.
778    assertNotNull(archivePath);
779    assertTrue(fs.exists(archivePath));
780    fs.truncate(archivePath, 0);
781    // make sure the size of the wal file is 0.
782    assertEquals(0, fs.getFileStatus(archivePath).getLen());
783
784    ReplicationSource source = Mockito.mock(ReplicationSource.class);
785    when(source.isPeerEnabled()).thenReturn(true);
786
787    Configuration localConf = new Configuration(CONF);
788    localConf.setInt("replication.source.maxretriesmultiplier", 1);
789    localConf.setBoolean("replication.source.eof.autorecovery", true);
790    // Start the reader thread.
791    createReader(false, localConf);
792    // Wait for the replication queue size to be 1. This means that we have handled
793    // 0 length wal from oldWALs directory.
794    Waiter.waitFor(localConf, 10000,
795      (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
796  }
797
798  /**
799   * This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some
800   * entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be
801   * decreased because {@link WALEntryBatch} is not put to
802   * {@link ReplicationSourceWALReader#entryBatchQueue}.
803   */
804  @Test
805  public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
806    appendEntriesToLogAndSync(3);
807    // get ending position
808    long position;
809    try (WALEntryStream entryStream =
810      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
811      for (int i = 0; i < 3; i++) {
812        assertNotNull(next(entryStream));
813      }
814      position = entryStream.getPosition();
815    }
816
817    Path walPath = getQueue().peek();
818    int maxThrowExceptionCount = 3;
819
820    ReplicationSource source = mockReplicationSource(false, CONF);
821    when(source.isPeerEnabled()).thenReturn(true);
822    PartialWALEntryFailingWALEntryFilter walEntryFilter =
823      new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
824    ReplicationSourceWALReader reader =
825      new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId);
826    reader.start();
827    WALEntryBatch entryBatch = reader.take();
828
829    assertNotNull(entryBatch);
830    assertEquals(3, entryBatch.getWalEntries().size());
831    long sum = entryBatch.getWalEntries().stream()
832      .mapToLong(WALEntryBatch::getEntrySizeExcludeBulkLoad).sum();
833    assertEquals(position, entryBatch.getLastWalPosition());
834    assertEquals(walPath, entryBatch.getLastWalPath());
835    assertEquals(3, entryBatch.getNbRowKeys());
836    assertEquals(sum, source.getSourceManager().getTotalBufferUsed());
837    assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
838    assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
839    assertNull(reader.poll(10));
840  }
841
842  private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
843    private int filteredWALEntryCount = -1;
844    private int walEntryCount = 0;
845    private int throwExceptionCount = -1;
846    private int maxThrowExceptionCount;
847
848    public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) {
849      this.maxThrowExceptionCount = throwExceptionLimit;
850      this.walEntryCount = walEntryCount;
851    }
852
853    @Override
854    public Entry filter(Entry entry) {
855      filteredWALEntryCount++;
856      if (filteredWALEntryCount < walEntryCount - 1) {
857        return entry;
858      }
859
860      filteredWALEntryCount = -1;
861      throwExceptionCount++;
862      if (throwExceptionCount <= maxThrowExceptionCount - 1) {
863        throw new WALEntryFilterRetryableException("failing filter");
864      }
865      return entry;
866    }
867
868    public int getThrowExceptionCount() {
869      return throwExceptionCount;
870    }
871  }
872
873}