001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.OptionalLong; 032import java.util.UUID; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.Future; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellBuilderType; 041import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 042import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.Server; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.Waiter; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.regionserver.HRegionServer; 055import org.apache.hadoop.hbase.regionserver.RegionServerServices; 056import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 057import org.apache.hadoop.hbase.replication.ReplicationPeer; 058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 059import org.apache.hadoop.hbase.replication.ReplicationQueueData; 060import org.apache.hadoop.hbase.replication.ReplicationQueueId; 061import org.apache.hadoop.hbase.replication.WALEntryFilter; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.ReplicationTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 067import org.apache.hadoop.hbase.wal.WAL; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 070import org.apache.hadoop.hbase.wal.WALFactory; 071import org.apache.hadoop.hbase.wal.WALKeyImpl; 072import org.apache.hadoop.hbase.wal.WALProvider; 073import org.apache.hadoop.hbase.wal.WALStreamReader; 074import org.junit.AfterClass; 075import org.junit.BeforeClass; 076import org.junit.ClassRule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.mockito.Mockito; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 084 085@Category({ ReplicationTests.class, MediumTests.class }) 086public class TestReplicationSource { 087 088 @ClassRule 089 public static final HBaseClassTestRule CLASS_RULE = 090 HBaseClassTestRule.forClass(TestReplicationSource.class); 091 092 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class); 093 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 094 private final static HBaseTestingUtil TEST_UTIL_PEER = new HBaseTestingUtil(); 095 private static FileSystem FS; 096 private static Path oldLogDir; 097 private static Path logDir; 098 private static Configuration conf = TEST_UTIL.getConfiguration(); 099 100 @BeforeClass 101 public static void setUpBeforeClass() throws Exception { 102 TEST_UTIL.startMiniDFSCluster(1); 103 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 104 Path rootDir = TEST_UTIL.createRootDir(); 105 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 106 if (FS.exists(oldLogDir)) { 107 FS.delete(oldLogDir, true); 108 } 109 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 110 if (FS.exists(logDir)) { 111 FS.delete(logDir, true); 112 } 113 } 114 115 @AfterClass 116 public static void tearDownAfterClass() throws Exception { 117 TEST_UTIL_PEER.shutdownMiniHBaseCluster(); 118 TEST_UTIL.shutdownMiniHBaseCluster(); 119 TEST_UTIL.shutdownMiniDFSCluster(); 120 } 121 122 /** 123 * Test the default ReplicationSource skips queuing hbase:meta WAL files. 124 */ 125 @Test 126 public void testDefaultSkipsMetaWAL() throws IOException { 127 ReplicationSource rs = new ReplicationSource(); 128 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 129 conf.setInt("replication.source.maxretriesmultiplier", 1); 130 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 131 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 132 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 133 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 134 Mockito.when(peerConfig.getReplicationEndpointImpl()) 135 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 136 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 137 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 138 Mockito.when(manager.getGlobalMetrics()) 139 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 140 141 RegionServerServices rss = 142 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 143 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 144 rs.init(conf, null, manager, null, mockPeer, rss, 145 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 146 new MetricsSource(queueId.toString())); 147 try { 148 rs.startup(); 149 assertTrue(rs.isSourceActive()); 150 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 151 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 152 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 153 rs.enqueueLog(new Path("a.1")); 154 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 155 } finally { 156 rs.terminate("Done"); 157 rss.stop("Done"); 158 } 159 } 160 161 /** 162 * Test that we filter out meta edits, etc. 163 */ 164 @Test 165 public void testWALEntryFilter() throws IOException { 166 // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource 167 // instance and init it. 168 ReplicationSource rs = new ReplicationSource(); 169 UUID uuid = UUID.randomUUID(); 170 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 171 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 172 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 173 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 174 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 175 Mockito.when(peerConfig.getReplicationEndpointImpl()) 176 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 177 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 178 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 179 RegionServerServices rss = 180 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 181 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 182 rs.init(conf, null, manager, null, mockPeer, rss, 183 new ReplicationQueueData(queueId, ImmutableMap.of()), uuid, p -> OptionalLong.empty(), 184 new MetricsSource(queueId.toString())); 185 try { 186 rs.startup(); 187 TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); 188 WALEntryFilter wef = rs.getWalEntryFilter(); 189 // Test non-system WAL edit. 190 WALEdit we = WALEditInternalHelper.addExtendedCell(new WALEdit(), 191 ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) 192 .setRow(HConstants.EMPTY_START_ROW).setFamily(HConstants.CATALOG_FAMILY) 193 .setType(Cell.Type.Put).build()); 194 WAL.Entry e = new WAL.Entry( 195 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we); 196 assertTrue(wef.filter(e) == e); 197 // Test system WAL edit. 198 e = new WAL.Entry( 199 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we); 200 assertNull(wef.filter(e)); 201 } finally { 202 rs.terminate("Done"); 203 rss.stop("Done"); 204 } 205 } 206 207 /** 208 * Sanity check that we can move logs around while we are reading from them. Should this test 209 * fail, ReplicationSource would have a hard time reading logs that are being archived. 210 */ 211 // This tests doesn't belong in here... it is not about ReplicationSource. 212 @Test 213 public void testLogMoving() throws Exception { 214 Path logPath = new Path(logDir, "log"); 215 if (!FS.exists(logDir)) { 216 FS.mkdirs(logDir); 217 } 218 if (!FS.exists(oldLogDir)) { 219 FS.mkdirs(oldLogDir); 220 } 221 WALProvider.Writer writer = 222 WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); 223 for (int i = 0; i < 3; i++) { 224 byte[] b = Bytes.toBytes(Integer.toString(i)); 225 KeyValue kv = new KeyValue(b, b, b); 226 WALEdit edit = new WALEdit(); 227 WALEditInternalHelper.addExtendedCell(edit, kv); 228 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 229 writer.append(new WAL.Entry(key, edit)); 230 writer.sync(false); 231 } 232 writer.close(); 233 234 WALStreamReader reader = 235 WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration()); 236 WAL.Entry entry = reader.next(); 237 assertNotNull(entry); 238 239 Path oldLogPath = new Path(oldLogDir, "log"); 240 FS.rename(logPath, oldLogPath); 241 242 entry = reader.next(); 243 assertNotNull(entry); 244 245 reader.next(); 246 entry = reader.next(); 247 248 assertNull(entry); 249 reader.close(); 250 } 251 252 /** 253 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from 254 * TestReplicationSource because doesn't need cluster. 255 */ 256 @Test 257 public void testTerminateTimeout() throws Exception { 258 ReplicationSource source = new ReplicationSource(); 259 ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint(); 260 try { 261 replicationEndpoint.start(); 262 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 263 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 264 Configuration testConf = HBaseConfiguration.create(); 265 testConf.setInt("replication.source.maxretriesmultiplier", 1); 266 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 267 ReplicationQueueId queueId = 268 new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer"); 269 source.init(testConf, null, manager, null, mockPeer, null, 270 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 271 null); 272 ExecutorService executor = Executors.newSingleThreadExecutor(); 273 Future<?> future = executor.submit(() -> source.terminate("testing source termination")); 274 long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); 275 Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone); 276 } finally { 277 replicationEndpoint.stop(); 278 } 279 } 280 281 @Test 282 public void testTerminateClearsBuffer() throws Exception { 283 ReplicationSource source = new ReplicationSource(); 284 ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null, 285 null, null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); 286 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 287 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 288 Configuration testConf = HBaseConfiguration.create(); 289 ReplicationQueueId queueId = 290 new ReplicationQueueId(ServerName.valueOf("test,123,123"), "testPeer"); 291 source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), 292 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 293 mock(MetricsSource.class)); 294 ReplicationSourceWALReader reader = 295 new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); 296 ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader); 297 source.workerThreads.put("testPeer", shipper); 298 WALEntryBatch batch = new WALEntryBatch(10, logDir); 299 WAL.Entry mockEntry = mock(WAL.Entry.class); 300 WALEdit mockEdit = mock(WALEdit.class); 301 WALKeyImpl mockKey = mock(WALKeyImpl.class); 302 when(mockEntry.getEdit()).thenReturn(mockEdit); 303 when(mockEdit.isEmpty()).thenReturn(false); 304 when(mockEntry.getKey()).thenReturn(mockKey); 305 when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); 306 when(mockEdit.heapSize()).thenReturn(10000L); 307 when(mockEdit.size()).thenReturn(0); 308 ArrayList<Cell> cells = new ArrayList<>(); 309 KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"), 310 Bytes.toBytes("v1")); 311 cells.add(kv); 312 when(mockEdit.getCells()).thenReturn(cells); 313 reader.addEntryToBatch(batch, mockEntry); 314 reader.entryBatchQueue.put(batch); 315 source.terminate("test"); 316 assertEquals(0, source.getSourceManager().getTotalBufferUsed()); 317 } 318 319 /** 320 * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192 321 */ 322 @Test 323 public void testServerShutdownRecoveredQueue() throws Exception { 324 try { 325 // Ensure single-threaded WAL 326 conf.set("hbase.wal.provider", "defaultProvider"); 327 conf.setInt("replication.sleep.before.failover", 2000); 328 // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. 329 conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); 330 SingleProcessHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); 331 TEST_UTIL_PEER.startMiniCluster(1); 332 333 HRegionServer serverA = cluster.getRegionServer(0); 334 final ReplicationSourceManager managerA = 335 serverA.getReplicationSourceService().getReplicationManager(); 336 HRegionServer serverB = cluster.getRegionServer(1); 337 final ReplicationSourceManager managerB = 338 serverB.getReplicationSourceService().getReplicationManager(); 339 final Admin admin = TEST_UTIL.getAdmin(); 340 341 final String peerId = "TestPeer"; 342 admin.addReplicationPeer(peerId, ReplicationPeerConfig.newBuilder() 343 .setClusterKey(TEST_UTIL_PEER.getRpcConnnectionURI()).build()); 344 // Wait for replication sources to come up 345 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 346 @Override 347 public boolean evaluate() { 348 return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); 349 } 350 }); 351 // Disabling peer makes sure there is at least one log to claim when the server dies 352 // The recovered queue will also stay there until the peer is disabled even if the 353 // WALs it contains have no data. 354 admin.disableReplicationPeer(peerId); 355 356 // Stopping serverA 357 // It's queues should be claimed by the only other alive server i.e. serverB 358 cluster.stopRegionServer(serverA.getServerName()); 359 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 360 @Override 361 public boolean evaluate() throws Exception { 362 return managerB.getOldSources().size() == 1; 363 } 364 }); 365 366 final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); 367 serverC.waitForServerOnline(); 368 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 369 @Override 370 public boolean evaluate() throws Exception { 371 return serverC.getReplicationSourceService() != null; 372 } 373 }); 374 final ReplicationSourceManager managerC = 375 ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); 376 // Sanity check 377 assertEquals(0, managerC.getOldSources().size()); 378 379 // Stopping serverB 380 // Now serverC should have two recovered queues: 381 // 1. The serverB's normal queue 382 // 2. serverA's recovered queue on serverB 383 cluster.stopRegionServer(serverB.getServerName()); 384 Waiter.waitFor(conf, 20000, 385 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2); 386 admin.enableReplicationPeer(peerId); 387 Waiter.waitFor(conf, 20000, 388 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0); 389 } finally { 390 conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); 391 } 392 } 393 394 /** 395 * Regionserver implementation that adds a delay on the graceful shutdown. 396 */ 397 public static class ShutdownDelayRegionServer extends HRegionServer { 398 public ShutdownDelayRegionServer(Configuration conf) throws IOException { 399 super(conf); 400 } 401 402 @Override 403 protected void stopServiceThreads() { 404 // Add a delay before service threads are shutdown. 405 // This will keep the zookeeper connection alive for the duration of the delay. 406 LOG.info("Adding a delay to the regionserver shutdown"); 407 try { 408 Thread.sleep(2000); 409 } catch (InterruptedException ex) { 410 LOG.error("Interrupted while sleeping"); 411 } 412 super.stopServiceThreads(); 413 } 414 } 415 416 /** 417 * Deadend Endpoint. Does nothing. 418 */ 419 public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint { 420 private final UUID uuid = UUID.randomUUID(); 421 422 @Override 423 public void init(Context context) throws IOException { 424 this.ctx = context; 425 } 426 427 @Override 428 public WALEntryFilter getWALEntryfilter() { 429 return null; 430 } 431 432 @Override 433 public synchronized UUID getPeerUUID() { 434 return this.uuid; 435 } 436 437 @Override 438 protected void doStart() { 439 notifyStarted(); 440 } 441 442 @Override 443 protected void doStop() { 444 notifyStopped(); 445 } 446 447 @Override 448 public boolean canReplicateToSameCluster() { 449 return true; 450 } 451 } 452 453 /** 454 * Deadend Endpoint. Does nothing. 455 */ 456 public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { 457 458 static int count = 0; 459 460 @Override 461 public synchronized UUID getPeerUUID() { 462 if (count == 0) { 463 count++; 464 throw new RuntimeException(); 465 } else { 466 return super.getPeerUUID(); 467 } 468 } 469 470 } 471 472 /** 473 * Bad Endpoint with failing connection to peer on demand. 474 */ 475 public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint { 476 static boolean failing = true; 477 478 @Override 479 public synchronized UUID getPeerUUID() { 480 return failing ? null : super.getPeerUUID(); 481 } 482 } 483 484 public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { 485 486 static int count = 0; 487 488 @Override 489 public synchronized UUID getPeerUUID() { 490 throw new RuntimeException(); 491 } 492 493 } 494 495 private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, 496 String endpointName) throws IOException { 497 conf.setInt("replication.source.maxretriesmultiplier", 1); 498 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 499 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 500 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 501 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 502 FaultyReplicationEndpoint.count = 0; 503 Mockito.when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); 504 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 505 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 506 Mockito.when(manager.getGlobalMetrics()) 507 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 508 RegionServerServices rss = 509 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 510 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), "qid"); 511 rs.init(conf, null, manager, null, mockPeer, rss, 512 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 513 new MetricsSource(queueId.toString())); 514 return rss; 515 } 516 517 /** 518 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 519 * and <b>eplication.source.regionserver.abort</b> is set to false. 520 */ 521 @Test 522 public void testAbortFalseOnError() throws IOException { 523 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 524 conf.setBoolean("replication.source.regionserver.abort", false); 525 ReplicationSource rs = new ReplicationSource(); 526 RegionServerServices rss = 527 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 528 try { 529 rs.startup(); 530 assertTrue(rs.isSourceActive()); 531 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 532 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 533 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 534 rs.enqueueLog(new Path("a.1")); 535 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 536 } finally { 537 rs.terminate("Done"); 538 rss.stop("Done"); 539 } 540 } 541 542 @Test 543 public void testReplicationSourceInitializingMetric() throws IOException { 544 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 545 conf.setBoolean("replication.source.regionserver.abort", false); 546 ReplicationSource rs = new ReplicationSource(); 547 RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName()); 548 try { 549 rs.startup(); 550 assertTrue(rs.isSourceActive()); 551 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1); 552 BadReplicationEndpoint.failing = false; 553 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0); 554 } finally { 555 rs.terminate("Done"); 556 rss.stop("Done"); 557 } 558 } 559 560 /** 561 * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, 562 * when <b>replication.source.regionserver.abort</b> is set to false. 563 */ 564 @Test 565 public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { 566 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 567 ReplicationSource rs = new ReplicationSource(); 568 RegionServerServices rss = 569 setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName()); 570 try { 571 rs.startup(); 572 assertTrue(true); 573 } finally { 574 rs.terminate("Done"); 575 rss.stop("Done"); 576 } 577 } 578 579 /** 580 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 581 * and <b>replication.source.regionserver.abort</b> is set to true. 582 */ 583 @Test 584 public void testAbortTrueOnError() throws IOException { 585 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 586 ReplicationSource rs = new ReplicationSource(); 587 RegionServerServices rss = 588 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 589 try { 590 rs.startup(); 591 assertTrue(rs.isSourceActive()); 592 Waiter.waitFor(conf, 1000, () -> rss.isAborted()); 593 assertTrue(rss.isAborted()); 594 Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive()); 595 assertFalse(rs.isSourceActive()); 596 } finally { 597 rs.terminate("Done"); 598 rss.stop("Done"); 599 } 600 } 601 602 /* 603 * Test age of oldest wal metric. 604 */ 605 @Test 606 public void testAgeOfOldestWal() throws Exception { 607 try { 608 ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); 609 EnvironmentEdgeManager.injectEdge(manualEdge); 610 611 String peerId = "1"; 612 MetricsSource metrics = new MetricsSource(peerId); 613 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 614 conf.setInt("replication.source.maxretriesmultiplier", 1); 615 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 616 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 617 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 618 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 619 Mockito.when(peerConfig.getReplicationEndpointImpl()) 620 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 621 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 622 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 623 Mockito.when(manager.getGlobalMetrics()) 624 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 625 RegionServerServices rss = 626 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 627 ReplicationQueueId queueId = new ReplicationQueueId(rss.getServerName(), peerId); 628 ReplicationSource source = new ReplicationSource(); 629 source.init(conf, null, manager, null, mockPeer, rss, 630 new ReplicationQueueData(queueId, ImmutableMap.of()), null, p -> OptionalLong.empty(), 631 metrics); 632 633 final Path log1 = new Path(logDir, "log-walgroup-a.8"); 634 manualEdge.setValue(10); 635 // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. 636 source.enqueueLog(log1); 637 MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(peerId); 638 assertEquals(2, metricsSource1.getOldestWalAge()); 639 640 final Path log2 = new Path(logDir, "log-walgroup-b.4"); 641 // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 642 source.enqueueLog(log2); 643 assertEquals(6, metricsSource1.getOldestWalAge()); 644 // Clear all metrics. 645 metrics.clear(); 646 } finally { 647 EnvironmentEdgeManager.reset(); 648 } 649 } 650 651 private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { 652 MetricsReplicationSourceFactoryImpl factory = 653 (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory 654 .getInstance(MetricsReplicationSourceFactory.class); 655 return factory.getSource(sourceId); 656 } 657}