001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.doNothing;
027import static org.mockito.Mockito.mock;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.OptionalLong;
033import java.util.UUID;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellBuilderFactory;
042import org.apache.hadoop.hbase.CellBuilderType;
043import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.KeyValue;
049import org.apache.hadoop.hbase.MiniHBaseCluster;
050import org.apache.hadoop.hbase.Server;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.Waiter;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.regionserver.HRegionServer;
056import org.apache.hadoop.hbase.regionserver.RegionServerServices;
057import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
058import org.apache.hadoop.hbase.replication.ReplicationPeer;
059import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
060import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
061import org.apache.hadoop.hbase.replication.WALEntryFilter;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.testclassification.ReplicationTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALFactory;
070import org.apache.hadoop.hbase.wal.WALKeyImpl;
071import org.apache.hadoop.hbase.wal.WALProvider;
072import org.apache.hadoop.hbase.wal.WALStreamReader;
073import org.junit.AfterClass;
074import org.junit.BeforeClass;
075import org.junit.ClassRule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.mockito.Mockito;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082@Category({ ReplicationTests.class, MediumTests.class })
083public class TestReplicationSource {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087    HBaseClassTestRule.forClass(TestReplicationSource.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class);
090  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
091  private final static HBaseTestingUtility TEST_UTIL_PEER = new HBaseTestingUtility();
092  private static FileSystem FS;
093  private static Path oldLogDir;
094  private static Path logDir;
095  private static Configuration conf = TEST_UTIL.getConfiguration();
096
097  @BeforeClass
098  public static void setUpBeforeClass() throws Exception {
099    TEST_UTIL.startMiniDFSCluster(1);
100    FS = TEST_UTIL.getDFSCluster().getFileSystem();
101    Path rootDir = TEST_UTIL.createRootDir();
102    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
103    if (FS.exists(oldLogDir)) {
104      FS.delete(oldLogDir, true);
105    }
106    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
107    if (FS.exists(logDir)) {
108      FS.delete(logDir, true);
109    }
110  }
111
112  @AfterClass
113  public static void tearDownAfterClass() throws Exception {
114    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
115    TEST_UTIL.shutdownMiniHBaseCluster();
116    TEST_UTIL.shutdownMiniDFSCluster();
117  }
118
119  /**
120   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
121   */
122  @Test
123  public void testDefaultSkipsMetaWAL() throws IOException {
124    ReplicationSource rs = new ReplicationSource();
125    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
126    conf.setInt("replication.source.maxretriesmultiplier", 1);
127    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
128    when(mockPeer.getConfiguration()).thenReturn(conf);
129    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
130    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
131    when(peerConfig.getReplicationEndpointImpl())
132      .thenReturn(DoNothingReplicationEndpoint.class.getName());
133    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
134    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
135    Mockito.when(manager.getGlobalMetrics())
136      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
137    String queueId = "qid";
138    RegionServerServices rss =
139      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
140    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(),
141      new MetricsSource(queueId));
142    try {
143      rs.startup();
144      assertTrue(rs.isSourceActive());
145      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
146      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
147      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
148      rs.enqueueLog(new Path("a.1"));
149      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
150    } finally {
151      rs.terminate("Done");
152      rss.stop("Done");
153    }
154  }
155
156  /**
157   * Test that we filter out meta edits, etc.
158   */
159  @Test
160  public void testWALEntryFilter() throws IOException {
161    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
162    // instance and init it.
163    ReplicationSource rs = new ReplicationSource();
164    UUID uuid = UUID.randomUUID();
165    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
166    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
167    when(mockPeer.getConfiguration()).thenReturn(conf);
168    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
169    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
170    when(peerConfig.getReplicationEndpointImpl())
171      .thenReturn(DoNothingReplicationEndpoint.class.getName());
172    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
173    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
174    String queueId = "qid";
175    RegionServerServices rss =
176      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
177    rs.init(conf, null, manager, null, mockPeer, rss, queueId, uuid, p -> OptionalLong.empty(),
178      new MetricsSource(queueId));
179    try {
180      rs.startup();
181      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
182      WALEntryFilter wef = rs.getWalEntryFilter();
183      // Test non-system WAL edit.
184      WALEdit we = new WALEdit()
185        .add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW)
186          .setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build());
187      WAL.Entry e = new WAL.Entry(
188        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we);
189      assertTrue(wef.filter(e) == e);
190      // Test system WAL edit.
191      e = new WAL.Entry(
192        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we);
193      assertNull(wef.filter(e));
194    } finally {
195      rs.terminate("Done");
196      rss.stop("Done");
197    }
198  }
199
200  /**
201   * Sanity check that we can move logs around while we are reading from them. Should this test
202   * fail, ReplicationSource would have a hard time reading logs that are being archived.
203   */
204  // This tests doesn't belong in here... it is not about ReplicationSource.
205  @Test
206  public void testLogMoving() throws Exception {
207    Path logPath = new Path(logDir, "log");
208    if (!FS.exists(logDir)) {
209      FS.mkdirs(logDir);
210    }
211    if (!FS.exists(oldLogDir)) {
212      FS.mkdirs(oldLogDir);
213    }
214    WALProvider.Writer writer =
215      WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration());
216    for (int i = 0; i < 3; i++) {
217      byte[] b = Bytes.toBytes(Integer.toString(i));
218      KeyValue kv = new KeyValue(b, b, b);
219      WALEdit edit = new WALEdit();
220      edit.add(kv);
221      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
222      writer.append(new WAL.Entry(key, edit));
223      writer.sync(false);
224    }
225    writer.close();
226
227    WALStreamReader reader =
228      WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration());
229    WAL.Entry entry = reader.next();
230    assertNotNull(entry);
231
232    Path oldLogPath = new Path(oldLogDir, "log");
233    FS.rename(logPath, oldLogPath);
234
235    entry = reader.next();
236    assertNotNull(entry);
237
238    reader.next();
239    entry = reader.next();
240
241    assertNull(entry);
242    reader.close();
243  }
244
245  /**
246   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from
247   * TestReplicationSource because doesn't need cluster.
248   */
249  @Test
250  public void testTerminateTimeout() throws Exception {
251    ReplicationSource source = new ReplicationSource();
252    ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint();
253    try {
254      replicationEndpoint.start();
255      ReplicationPeer mockPeer = mock(ReplicationPeer.class);
256      when(mockPeer.getPeerBandwidth()).thenReturn(0L);
257      Configuration testConf = HBaseConfiguration.create();
258      testConf.setInt("replication.source.maxretriesmultiplier", 1);
259      ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
260      source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
261        p -> OptionalLong.empty(), null);
262      ExecutorService executor = Executors.newSingleThreadExecutor();
263      Future<?> future = executor.submit(() -> source.terminate("testing source termination"));
264      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
265      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
266    } finally {
267      replicationEndpoint.stop();
268    }
269  }
270
271  @Test
272  public void testTerminateClearsBuffer() throws Exception {
273    ReplicationSource source = new ReplicationSource();
274    ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null,
275      null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class));
276    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
277    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
278    Configuration testConf = HBaseConfiguration.create();
279    source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), "testPeer",
280      null, p -> OptionalLong.empty(), mock(MetricsSource.class));
281    ReplicationSourceWALReader reader =
282      new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
283    ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source);
284    shipper.entryReader = reader;
285    source.workerThreads.put("testPeer", shipper);
286    WALEntryBatch batch = new WALEntryBatch(10, logDir);
287    WAL.Entry mockEntry = mock(WAL.Entry.class);
288    WALEdit mockEdit = mock(WALEdit.class);
289    WALKeyImpl mockKey = mock(WALKeyImpl.class);
290    when(mockEntry.getEdit()).thenReturn(mockEdit);
291    when(mockEdit.isEmpty()).thenReturn(false);
292    when(mockEntry.getKey()).thenReturn(mockKey);
293    when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
294    when(mockEdit.heapSize()).thenReturn(10000L);
295    when(mockEdit.size()).thenReturn(0);
296    ArrayList<Cell> cells = new ArrayList<>();
297    KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"),
298      Bytes.toBytes("v1"));
299    cells.add(kv);
300    when(mockEdit.getCells()).thenReturn(cells);
301    reader.addEntryToBatch(batch, mockEntry);
302    reader.entryBatchQueue.put(batch);
303    source.terminate("test");
304    assertEquals(0, source.getSourceManager().getTotalBufferUsed());
305  }
306
307  /**
308   * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192
309   */
310  @Test
311  public void testServerShutdownRecoveredQueue() throws Exception {
312    try {
313      // Ensure single-threaded WAL
314      conf.set("hbase.wal.provider", "defaultProvider");
315      conf.setInt("replication.sleep.before.failover", 2000);
316      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
317      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
318      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
319      TEST_UTIL_PEER.startMiniCluster(1);
320
321      HRegionServer serverA = cluster.getRegionServer(0);
322      final ReplicationSourceManager managerA =
323        serverA.getReplicationSourceService().getReplicationManager();
324      HRegionServer serverB = cluster.getRegionServer(1);
325      final ReplicationSourceManager managerB =
326        serverB.getReplicationSourceService().getReplicationManager();
327      final Admin admin = TEST_UTIL.getAdmin();
328
329      final String peerId = "TestPeer";
330      admin.addReplicationPeer(peerId,
331        ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
332      // Wait for replication sources to come up
333      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
334        @Override
335        public boolean evaluate() {
336          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
337        }
338      });
339      // Disabling peer makes sure there is at least one log to claim when the server dies
340      // The recovered queue will also stay there until the peer is disabled even if the
341      // WALs it contains have no data.
342      admin.disableReplicationPeer(peerId);
343
344      // Stopping serverA
345      // It's queues should be claimed by the only other alive server i.e. serverB
346      cluster.stopRegionServer(serverA.getServerName());
347      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
348        @Override
349        public boolean evaluate() throws Exception {
350          return managerB.getOldSources().size() == 1;
351        }
352      });
353
354      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
355      serverC.waitForServerOnline();
356      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
357        @Override
358        public boolean evaluate() throws Exception {
359          return serverC.getReplicationSourceService() != null;
360        }
361      });
362      final ReplicationSourceManager managerC =
363        ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
364      // Sanity check
365      assertEquals(0, managerC.getOldSources().size());
366
367      // Stopping serverB
368      // Now serverC should have two recovered queues:
369      // 1. The serverB's normal queue
370      // 2. serverA's recovered queue on serverB
371      cluster.stopRegionServer(serverB.getServerName());
372      Waiter.waitFor(conf, 20000,
373        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
374      admin.enableReplicationPeer(peerId);
375      Waiter.waitFor(conf, 20000,
376        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
377    } finally {
378      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
379    }
380  }
381
382  /**
383   * Regionserver implementation that adds a delay on the graceful shutdown.
384   */
385  public static class ShutdownDelayRegionServer extends HRegionServer {
386    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
387      super(conf);
388    }
389
390    @Override
391    protected void stopServiceThreads() {
392      // Add a delay before service threads are shutdown.
393      // This will keep the zookeeper connection alive for the duration of the delay.
394      LOG.info("Adding a delay to the regionserver shutdown");
395      try {
396        Thread.sleep(2000);
397      } catch (InterruptedException ex) {
398        LOG.error("Interrupted while sleeping");
399      }
400      super.stopServiceThreads();
401    }
402  }
403
404  /**
405   * Deadend Endpoint. Does nothing.
406   */
407  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
408    private final UUID uuid = UUID.randomUUID();
409
410    @Override
411    public void init(Context context) throws IOException {
412      this.ctx = context;
413    }
414
415    @Override
416    public WALEntryFilter getWALEntryfilter() {
417      return null;
418    }
419
420    @Override
421    public synchronized UUID getPeerUUID() {
422      return this.uuid;
423    }
424
425    @Override
426    protected void doStart() {
427      notifyStarted();
428    }
429
430    @Override
431    protected void doStop() {
432      notifyStopped();
433    }
434
435    @Override
436    public boolean canReplicateToSameCluster() {
437      return true;
438    }
439  }
440
441  /**
442   * Deadend Endpoint. Does nothing.
443   */
444  public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
445
446    static int count = 0;
447
448    @Override
449    public synchronized UUID getPeerUUID() {
450      if (count == 0) {
451        count++;
452        throw new RuntimeException();
453      } else {
454        return super.getPeerUUID();
455      }
456    }
457
458  }
459
460  /**
461   * Bad Endpoint with failing connection to peer on demand.
462   */
463  public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
464    static boolean failing = true;
465
466    @Override
467    public synchronized UUID getPeerUUID() {
468      return failing ? null : super.getPeerUUID();
469    }
470  }
471
472  public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
473
474    static int count = 0;
475
476    @Override
477    public synchronized UUID getPeerUUID() {
478      throw new RuntimeException();
479    }
480
481  }
482
483  /**
484   * Test HBASE-20497 Moved here from TestReplicationSource because doesn't need cluster.
485   */
486  @Test
487  public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
488    String walGroupId = "fake-wal-group-id";
489    ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
490    ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
491    RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
492    Server server = mock(Server.class);
493    when(server.getServerName()).thenReturn(serverName);
494    when(source.getServer()).thenReturn(server);
495    when(source.getServerWALsBelongTo()).thenReturn(deadServer);
496    ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
497    when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
498      .thenReturn(1001L);
499    when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
500      .thenReturn(-1L);
501    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
502    conf.setInt("replication.source.maxretriesmultiplier", -1);
503    MetricsSource metricsSource = mock(MetricsSource.class);
504    doNothing().when(metricsSource).incrSizeOfLogQueue();
505    ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source);
506    logQueue.enqueueLog(new Path("/www/html/test"), walGroupId);
507    RecoveredReplicationSourceShipper shipper =
508      new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage);
509    assertEquals(1001L, shipper.getStartPosition());
510  }
511
512  private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
513    String endpointName) throws IOException {
514    conf.setInt("replication.source.maxretriesmultiplier", 1);
515    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
516    when(mockPeer.getConfiguration()).thenReturn(conf);
517    when(mockPeer.getPeerBandwidth()).thenReturn(0L);
518    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
519    FaultyReplicationEndpoint.count = 0;
520    when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName);
521    when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
522    ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
523    Mockito.when(manager.getGlobalMetrics())
524      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
525    String queueId = "qid";
526    RegionServerServices rss =
527      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
528    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(),
529      new MetricsSource(queueId));
530    return rss;
531  }
532
533  /**
534   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
535   * and <b>eplication.source.regionserver.abort</b> is set to false.
536   */
537  @Test
538  public void testAbortFalseOnError() throws IOException {
539    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
540    conf.setBoolean("replication.source.regionserver.abort", false);
541    ReplicationSource rs = new ReplicationSource();
542    RegionServerServices rss =
543      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
544    try {
545      rs.startup();
546      assertTrue(rs.isSourceActive());
547      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
548      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
549      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
550      rs.enqueueLog(new Path("a.1"));
551      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
552    } finally {
553      rs.terminate("Done");
554      rss.stop("Done");
555    }
556  }
557
558  @Test
559  public void testReplicationSourceInitializingMetric() throws IOException {
560    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
561    conf.setBoolean("replication.source.regionserver.abort", false);
562    ReplicationSource rs = new ReplicationSource();
563    RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName());
564    try {
565      rs.startup();
566      assertTrue(rs.isSourceActive());
567      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
568      BadReplicationEndpoint.failing = false;
569      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
570    } finally {
571      rs.terminate("Done");
572      rss.stop("Done");
573    }
574  }
575
576  /**
577   * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
578   * when <b>replication.source.regionserver.abort</b> is set to false.
579   */
580  @Test
581  public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
582    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
583    ReplicationSource rs = new ReplicationSource();
584    RegionServerServices rss =
585      setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName());
586    try {
587      rs.startup();
588      assertTrue(true);
589    } finally {
590      rs.terminate("Done");
591      rss.stop("Done");
592    }
593  }
594
595  /**
596   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
597   * and <b>replication.source.regionserver.abort</b> is set to true.
598   */
599  @Test
600  public void testAbortTrueOnError() throws IOException {
601    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
602    ReplicationSource rs = new ReplicationSource();
603    RegionServerServices rss =
604      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
605    try {
606      rs.startup();
607      assertTrue(rs.isSourceActive());
608      Waiter.waitFor(conf, 1000, () -> rss.isAborted());
609      assertTrue(rss.isAborted());
610      Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
611      assertFalse(rs.isSourceActive());
612    } finally {
613      rs.terminate("Done");
614      rss.stop("Done");
615    }
616  }
617
618  /*
619   * Test age of oldest wal metric.
620   */
621  @Test
622  public void testAgeOfOldestWal() throws Exception {
623    try {
624      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
625      EnvironmentEdgeManager.injectEdge(manualEdge);
626
627      String id = "1";
628      MetricsSource metrics = new MetricsSource(id);
629      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
630      conf.setInt("replication.source.maxretriesmultiplier", 1);
631      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
632      Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
633      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
634      ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
635      Mockito.when(peerConfig.getReplicationEndpointImpl())
636        .thenReturn(DoNothingReplicationEndpoint.class.getName());
637      Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
638      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
639      Mockito.when(manager.getGlobalMetrics())
640        .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
641      RegionServerServices rss =
642        TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
643
644      ReplicationSource source = new ReplicationSource();
645      source.init(conf, null, manager, null, mockPeer, rss, id, null, p -> OptionalLong.empty(),
646        metrics);
647
648      final Path log1 = new Path(logDir, "log-walgroup-a.8");
649      manualEdge.setValue(10);
650      // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
651      source.enqueueLog(log1);
652      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
653      assertEquals(2, metricsSource1.getOldestWalAge());
654
655      final Path log2 = new Path(logDir, "log-walgroup-b.4");
656      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
657      source.enqueueLog(log2);
658      assertEquals(6, metricsSource1.getOldestWalAge());
659      // Clear all metrics.
660      metrics.clear();
661    } finally {
662      EnvironmentEdgeManager.reset();
663    }
664  }
665
666  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
667    MetricsReplicationSourceFactory factory =
668      CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class);
669    return factory.getSource(sourceId);
670  }
671}