001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.OptionalLong;
032import java.util.UUID;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellBuilderType;
041import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
042import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.Server;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.Waiter;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.regionserver.HRegionServer;
055import org.apache.hadoop.hbase.regionserver.RegionServerServices;
056import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
057import org.apache.hadoop.hbase.replication.ReplicationPeer;
058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
059import org.apache.hadoop.hbase.replication.ReplicationQueueData;
060import org.apache.hadoop.hbase.replication.ReplicationQueueId;
061import org.apache.hadoop.hbase.replication.WALEntryFilter;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.testclassification.ReplicationTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
070import org.apache.hadoop.hbase.wal.WALFactory;
071import org.apache.hadoop.hbase.wal.WALKeyImpl;
072import org.apache.hadoop.hbase.wal.WALProvider;
073import org.apache.hadoop.hbase.wal.WALStreamReader;
074import org.junit.AfterClass;
075import org.junit.BeforeClass;
076import org.junit.ClassRule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.mockito.Mockito;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
084
085@Category({ ReplicationTests.class, MediumTests.class })
086public class TestReplicationSource {
087
088  @ClassRule
089  public static final HBaseClassTestRule CLASS_RULE =
090    HBaseClassTestRule.forClass(TestReplicationSource.class);
091
092  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class);
093  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
094  private final static HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil();
095  private static FileSystem FS;
096  private static Path oldLogDir;
097  private static Path logDir;
098  private static Configuration conf = TEST_UTIL.getConfiguration();
099
100  @BeforeClass
101  public static void setUpBeforeClass() throws Exception {
102    TEST_UTIL.startMiniDFSCluster(1);
103    FS = TEST_UTIL.getDFSCluster().getFileSystem();
104    Path rootDir = TEST_UTIL.createRootDir();
105    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
106    if (FS.exists(oldLogDir)) {
107      FS.delete(oldLogDir, true);
108    }
109    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
110    if (FS.exists(logDir)) {
111      FS.delete(logDir, true);
112    }
113  }
114
115  @AfterClass
116  public static void tearDownAfterClass() throws Exception {
117    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
118    TEST_UTIL.shutdownMiniHBaseCluster();
119    TEST_UTIL.shutdownMiniDFSCluster();
120  }
121
122  /**
123   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
124   */
125  @Test
126  public void testDefaultSkipsMetaWAL() throws IOException {
127    ReplicationSource rs = new ReplicationSource();
128    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
129    conf.setInt("replication.source.maxretriesmultiplier", 1);
130    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
131    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
132    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
133    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
134    Mockito.when(peerConfig.getReplicationEndpointImpl())
135      .thenReturn(DoNothingReplicationEndpoint.class.getName());
136    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
137    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
138    Mockito.when(manager.getGlobalMetrics())
139      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
140
141    RegionServerServices rss =
142      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
143    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
144    rs.init(conf, null, manager, null, mockPeer, rss,
145      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
146      new MetricsSource(queueId.toString()));
147    try {
148      rs.startup();
149      assertTrue(rs.isSourceActive());
150      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
151      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
152      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
153      rs.enqueueLog(new Path("a.1"));
154      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
155    } finally {
156      rs.terminate("Done");
157      rss.stop("Done");
158    }
159  }
160
161  /**
162   * Test that we filter out meta edits, etc.
163   */
164  @Test
165  public void testWALEntryFilter() throws IOException {
166    // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
167    // instance and init it.
168    ReplicationSource rs = new ReplicationSource();
169    UUID uuid = UUID.randomUUID();
170    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
171    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
172    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
173    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
174    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
175    Mockito.when(peerConfig.getReplicationEndpointImpl())
176      .thenReturn(DoNothingReplicationEndpoint.class.getName());
177    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
178    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
179    RegionServerServices rss =
180      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
181    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
182    rs.init(conf, null, manager, null, mockPeer, rss,
183      new ReplicationQueueData(queueId, ImmutableMap.of()), uuid, p -> OptionalLong.empty(),
184      new MetricsSource(queueId.toString()));
185    try {
186      rs.startup();
187      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
188      WALEntryFilter wef = rs.getWalEntryFilter();
189      // Test non-system WAL edit.
190      WALEdit we = WALEditInternalHelper.addExtendedCell(new WALEdit(),
191        ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
192          .setRow(HConstants.EMPTY_START_ROW).setFamily(HConstants.CATALOG_FAMILY)
193          .setType(Cell.Type.Put).build());
194      WAL.Entry e = new WAL.Entry(
195        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we);
196      assertTrue(wef.filter(e) == e);
197      // Test system WAL edit.
198      e = new WAL.Entry(
199        new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we);
200      assertNull(wef.filter(e));
201    } finally {
202      rs.terminate("Done");
203      rss.stop("Done");
204    }
205  }
206
207  /**
208   * Sanity check that we can move logs around while we are reading from them. Should this test
209   * fail, ReplicationSource would have a hard time reading logs that are being archived.
210   */
211  // This tests doesn't belong in here... it is not about ReplicationSource.
212  @Test
213  public void testLogMoving() throws Exception {
214    Path logPath = new Path(logDir, "log");
215    if (!FS.exists(logDir)) {
216      FS.mkdirs(logDir);
217    }
218    if (!FS.exists(oldLogDir)) {
219      FS.mkdirs(oldLogDir);
220    }
221    WALProvider.Writer writer =
222      WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration());
223    for (int i = 0; i < 3; i++) {
224      byte[] b = Bytes.toBytes(Integer.toString(i));
225      KeyValue kv = new KeyValue(b, b, b);
226      WALEdit edit = new WALEdit();
227      WALEditInternalHelper.addExtendedCell(edit, kv);
228      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
229      writer.append(new WAL.Entry(key, edit));
230      writer.sync(false);
231    }
232    writer.close();
233
234    WALStreamReader reader =
235      WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration());
236    WAL.Entry entry = reader.next();
237    assertNotNull(entry);
238
239    Path oldLogPath = new Path(oldLogDir, "log");
240    FS.rename(logPath, oldLogPath);
241
242    entry = reader.next();
243    assertNotNull(entry);
244
245    reader.next();
246    entry = reader.next();
247
248    assertNull(entry);
249    reader.close();
250  }
251
252  /**
253   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from
254   * TestReplicationSource because doesn't need cluster.
255   */
256  @Test
257  public void testTerminateTimeout() throws Exception {
258    ReplicationSource source = new ReplicationSource();
259    ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint();
260    try {
261      replicationEndpoint.start();
262      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
263      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
264      Configuration testConf = HBaseConfiguration.create();
265      testConf.setInt("replication.source.maxretriesmultiplier", 1);
266      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
267      ReplicationQueueId queueId =
268        new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer");
269      source.init(testConf, null, manager, null, mockPeer, null,
270        new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
271        null);
272      ExecutorService executor = Executors.newSingleThreadExecutor();
273      Future<?> future = executor.submit(() -> source.terminate("testing source termination"));
274      long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
275      Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone);
276    } finally {
277      replicationEndpoint.stop();
278    }
279  }
280
281  @Test
282  public void testTerminateClearsBuffer() throws Exception {
283    ReplicationSource source = new ReplicationSource();
284    ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null,
285      null, null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class));
286    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
287    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
288    Configuration testConf = HBaseConfiguration.create();
289    ReplicationQueueId queueId =
290      new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer");
291    source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class),
292      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
293      mock(MetricsSource.class));
294    ReplicationSourceWALReader reader =
295      new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
296    ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader);
297    source.workerThreads.put("testPeer", shipper);
298    WALEntryBatch batch = new WALEntryBatch(10, logDir);
299    WAL.Entry mockEntry = mock(WAL.Entry.class);
300    WALEdit mockEdit = mock(WALEdit.class);
301    WALKeyImpl mockKey = mock(WALKeyImpl.class);
302    when(mockEntry.getEdit()).thenReturn(mockEdit);
303    when(mockEdit.isEmpty()).thenReturn(false);
304    when(mockEntry.getKey()).thenReturn(mockKey);
305    when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
306    when(mockEdit.heapSize()).thenReturn(10000L);
307    when(mockEdit.size()).thenReturn(0);
308    ArrayList<Cell> cells = new ArrayList<>();
309    KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"),
310      Bytes.toBytes("v1"));
311    cells.add(kv);
312    when(mockEdit.getCells()).thenReturn(cells);
313    reader.addEntryToBatch(batch, mockEntry);
314    reader.entryBatchQueue.put(batch);
315    source.terminate("test");
316    assertEquals(0, source.getSourceManager().getTotalBufferUsed());
317  }
318
319  /**
320   * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192
321   */
322  @Test
323  public void testServerShutdownRecoveredQueue() throws Exception {
324    try {
325      // Ensure single-threaded WAL
326      conf.set("hbase.wal.provider", "defaultProvider");
327      conf.setInt("replication.sleep.before.failover", 2000);
328      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
329      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
330      SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
331      TEST_UTIL_PEER.startMiniCluster(1);
332
333      HRegionServer serverA = cluster.getRegionServer(0);
334      final ReplicationSourceManager managerA =
335        serverA.getReplicationSourceService().getReplicationManager();
336      HRegionServer serverB = cluster.getRegionServer(1);
337      final ReplicationSourceManager managerB =
338        serverB.getReplicationSourceService().getReplicationManager();
339      final Admin admin = TEST_UTIL.getAdmin();
340
341      final String peerId = "TestPeer";
342      admin.addReplicationPeer(peerId, ReplicationPeerConfig.newBuilder()
343        .setClusterKey(TEST_UTIL_PEER.getRpcConnnectionURI()).build());
344      // Wait for replication sources to come up
345      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
346        @Override
347        public boolean evaluate() {
348          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
349        }
350      });
351      // Disabling peer makes sure there is at least one log to claim when the server dies
352      // The recovered queue will also stay there until the peer is disabled even if the
353      // WALs it contains have no data.
354      admin.disableReplicationPeer(peerId);
355
356      // Stopping serverA
357      // It's queues should be claimed by the only other alive server i.e. serverB
358      cluster.stopRegionServer(serverA.getServerName());
359      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
360        @Override
361        public boolean evaluate() throws Exception {
362          return managerB.getOldSources().size() == 1;
363        }
364      });
365
366      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
367      serverC.waitForServerOnline();
368      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
369        @Override
370        public boolean evaluate() throws Exception {
371          return serverC.getReplicationSourceService() != null;
372        }
373      });
374      final ReplicationSourceManager managerC =
375        ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
376      // Sanity check
377      assertEquals(0, managerC.getOldSources().size());
378
379      // Stopping serverB
380      // Now serverC should have two recovered queues:
381      // 1. The serverB's normal queue
382      // 2. serverA's recovered queue on serverB
383      cluster.stopRegionServer(serverB.getServerName());
384      Waiter.waitFor(conf, 20000,
385        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2);
386      admin.enableReplicationPeer(peerId);
387      Waiter.waitFor(conf, 20000,
388        (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0);
389    } finally {
390      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
391    }
392  }
393
394  /**
395   * Regionserver implementation that adds a delay on the graceful shutdown.
396   */
397  public static class ShutdownDelayRegionServer extends HRegionServer {
398    public ShutdownDelayRegionServer(Configuration conf) throws IOException {
399      super(conf);
400    }
401
402    @Override
403    protected void stopServiceThreads() {
404      // Add a delay before service threads are shutdown.
405      // This will keep the zookeeper connection alive for the duration of the delay.
406      LOG.info("Adding a delay to the regionserver shutdown");
407      try {
408        Thread.sleep(2000);
409      } catch (InterruptedException ex) {
410        LOG.error("Interrupted while sleeping");
411      }
412      super.stopServiceThreads();
413    }
414  }
415
416  /**
417   * Deadend Endpoint. Does nothing.
418   */
419  public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
420    private final UUID uuid = UUID.randomUUID();
421
422    @Override
423    public void init(Context context) throws IOException {
424      this.ctx = context;
425    }
426
427    @Override
428    public WALEntryFilter getWALEntryfilter() {
429      return null;
430    }
431
432    @Override
433    public synchronized UUID getPeerUUID() {
434      return this.uuid;
435    }
436
437    @Override
438    protected void doStart() {
439      notifyStarted();
440    }
441
442    @Override
443    protected void doStop() {
444      notifyStopped();
445    }
446
447    @Override
448    public boolean canReplicateToSameCluster() {
449      return true;
450    }
451  }
452
453  /**
454   * Deadend Endpoint. Does nothing.
455   */
456  public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint {
457
458    static int count = 0;
459
460    @Override
461    public synchronized UUID getPeerUUID() {
462      if (count == 0) {
463        count++;
464        throw new RuntimeException();
465      } else {
466        return super.getPeerUUID();
467      }
468    }
469
470  }
471
472  /**
473   * Bad Endpoint with failing connection to peer on demand.
474   */
475  public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
476    static boolean failing = true;
477
478    @Override
479    public synchronized UUID getPeerUUID() {
480      return failing ? null : super.getPeerUUID();
481    }
482  }
483
484  public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
485
486    static int count = 0;
487
488    @Override
489    public synchronized UUID getPeerUUID() {
490      throw new RuntimeException();
491    }
492
493  }
494
495  private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf,
496    String endpointName) throws IOException {
497    conf.setInt("replication.source.maxretriesmultiplier", 1);
498    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
499    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
500    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
501    ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
502    FaultyReplicationEndpoint.count = 0;
503    Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName);
504    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
505    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
506    Mockito.when(manager.getGlobalMetrics())
507      .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
508    RegionServerServices rss =
509      TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
510    ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid");
511    rs.init(conf, null, manager, null, mockPeer, rss,
512      new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
513      new MetricsSource(queueId.toString()));
514    return rss;
515  }
516
517  /**
518   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
519   * and <b>eplication.source.regionserver.abort</b> is set to false.
520   */
521  @Test
522  public void testAbortFalseOnError() throws IOException {
523    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
524    conf.setBoolean("replication.source.regionserver.abort", false);
525    ReplicationSource rs = new ReplicationSource();
526    RegionServerServices rss =
527      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
528    try {
529      rs.startup();
530      assertTrue(rs.isSourceActive());
531      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
532      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
533      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
534      rs.enqueueLog(new Path("a.1"));
535      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
536    } finally {
537      rs.terminate("Done");
538      rss.stop("Done");
539    }
540  }
541
542  @Test
543  public void testReplicationSourceInitializingMetric() throws IOException {
544    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
545    conf.setBoolean("replication.source.regionserver.abort", false);
546    ReplicationSource rs = new ReplicationSource();
547    RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName());
548    try {
549      rs.startup();
550      assertTrue(rs.isSourceActive());
551      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
552      BadReplicationEndpoint.failing = false;
553      Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
554    } finally {
555      rs.terminate("Done");
556      rss.stop("Done");
557    }
558  }
559
560  /**
561   * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
562   * when <b>replication.source.regionserver.abort</b> is set to false.
563   */
564  @Test
565  public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
566    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
567    ReplicationSource rs = new ReplicationSource();
568    RegionServerServices rss =
569      setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName());
570    try {
571      rs.startup();
572      assertTrue(true);
573    } finally {
574      rs.terminate("Done");
575      rss.stop("Done");
576    }
577  }
578
579  /**
580   * Test ReplicationSource retries startup once an uncaught exception happens during initialization
581   * and <b>replication.source.regionserver.abort</b> is set to true.
582   */
583  @Test
584  public void testAbortTrueOnError() throws IOException {
585    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
586    ReplicationSource rs = new ReplicationSource();
587    RegionServerServices rss =
588      setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
589    try {
590      rs.startup();
591      assertTrue(rs.isSourceActive());
592      Waiter.waitFor(conf, 1000, () -> rss.isAborted());
593      assertTrue(rss.isAborted());
594      Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());
595      assertFalse(rs.isSourceActive());
596    } finally {
597      rs.terminate("Done");
598      rss.stop("Done");
599    }
600  }
601
602  /*
603   * Test age of oldest wal metric.
604   */
605  @Test
606  public void testAgeOfOldestWal() throws Exception {
607    try {
608      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
609      EnvironmentEdgeManager.injectEdge(manualEdge);
610
611      String peerId = "1";
612      MetricsSource metrics = new MetricsSource(peerId);
613      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
614      conf.setInt("replication.source.maxretriesmultiplier", 1);
615      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
616      Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
617      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
618      ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
619      Mockito.when(peerConfig.getReplicationEndpointImpl())
620        .thenReturn(DoNothingReplicationEndpoint.class.getName());
621      Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
622      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
623      Mockito.when(manager.getGlobalMetrics())
624        .thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
625      RegionServerServices rss =
626        TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
627      ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), peerId);
628      ReplicationSource source = new ReplicationSource();
629      source.init(conf, null, manager, null, mockPeer, rss,
630        new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(),
631        metrics);
632
633      final Path log1 = new Path(logDir, "log-walgroup-a.8");
634      manualEdge.setValue(10);
635      // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
636      source.enqueueLog(log1);
637      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(peerId);
638      assertEquals(2, metricsSource1.getOldestWalAge());
639
640      final Path log2 = new Path(logDir, "log-walgroup-b.4");
641      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
642      source.enqueueLog(log2);
643      assertEquals(6, metricsSource1.getOldestWalAge());
644      // Clear all metrics.
645      metrics.clear();
646    } finally {
647      EnvironmentEdgeManager.reset();
648    }
649  }
650
651  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
652    MetricsReplicationSourceFactoryImpl factory =
653      (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory
654        .getInstance(MetricsReplicationSourceFactory.class);
655    return factory.getSource(sourceId);
656  }
657}