001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.Mockito.doNothing; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.OptionalLong; 033import java.util.UUID; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellBuilderFactory; 042import org.apache.hadoop.hbase.CellBuilderType; 043import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HBaseTestingUtility; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.KeyValue; 049import org.apache.hadoop.hbase.MiniHBaseCluster; 050import org.apache.hadoop.hbase.Server; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.Waiter; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.regionserver.HRegionServer; 056import org.apache.hadoop.hbase.regionserver.RegionServerServices; 057import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 058import org.apache.hadoop.hbase.replication.ReplicationPeer; 059import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 060import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 061import org.apache.hadoop.hbase.replication.WALEntryFilter; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.ReplicationTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 067import org.apache.hadoop.hbase.wal.WAL; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.hadoop.hbase.wal.WALFactory; 070import org.apache.hadoop.hbase.wal.WALKeyImpl; 071import org.apache.hadoop.hbase.wal.WALProvider; 072import org.apache.hadoop.hbase.wal.WALStreamReader; 073import org.junit.AfterClass; 074import org.junit.BeforeClass; 075import org.junit.ClassRule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.mockito.Mockito; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082@Category({ ReplicationTests.class, MediumTests.class }) 083public class TestReplicationSource { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestReplicationSource.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class); 090 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 091 private final static HBaseTestingUtility TEST_UTIL_PEER = new HBaseTestingUtility(); 092 private static FileSystem FS; 093 private static Path oldLogDir; 094 private static Path logDir; 095 private static Configuration conf = TEST_UTIL.getConfiguration(); 096 097 @BeforeClass 098 public static void setUpBeforeClass() throws Exception { 099 TEST_UTIL.startMiniDFSCluster(1); 100 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 101 Path rootDir = TEST_UTIL.createRootDir(); 102 oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 103 if (FS.exists(oldLogDir)) { 104 FS.delete(oldLogDir, true); 105 } 106 logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); 107 if (FS.exists(logDir)) { 108 FS.delete(logDir, true); 109 } 110 } 111 112 @AfterClass 113 public static void tearDownAfterClass() throws Exception { 114 TEST_UTIL_PEER.shutdownMiniHBaseCluster(); 115 TEST_UTIL.shutdownMiniHBaseCluster(); 116 TEST_UTIL.shutdownMiniDFSCluster(); 117 } 118 119 /** 120 * Test the default ReplicationSource skips queuing hbase:meta WAL files. 121 */ 122 @Test 123 public void testDefaultSkipsMetaWAL() throws IOException { 124 ReplicationSource rs = new ReplicationSource(); 125 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 126 conf.setInt("replication.source.maxretriesmultiplier", 1); 127 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 128 when(mockPeer.getConfiguration()).thenReturn(conf); 129 when(mockPeer.getPeerBandwidth()).thenReturn(0L); 130 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 131 when(peerConfig.getReplicationEndpointImpl()) 132 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 133 when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 134 ReplicationSourceManager manager = mock(ReplicationSourceManager.class); 135 Mockito.when(manager.getGlobalMetrics()) 136 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 137 String queueId = "qid"; 138 RegionServerServices rss = 139 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 140 rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), 141 new MetricsSource(queueId)); 142 try { 143 rs.startup(); 144 assertTrue(rs.isSourceActive()); 145 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 146 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 147 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 148 rs.enqueueLog(new Path("a.1")); 149 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 150 } finally { 151 rs.terminate("Done"); 152 rss.stop("Done"); 153 } 154 } 155 156 /** 157 * Test that we filter out meta edits, etc. 158 */ 159 @Test 160 public void testWALEntryFilter() throws IOException { 161 // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource 162 // instance and init it. 163 ReplicationSource rs = new ReplicationSource(); 164 UUID uuid = UUID.randomUUID(); 165 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 166 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 167 when(mockPeer.getConfiguration()).thenReturn(conf); 168 when(mockPeer.getPeerBandwidth()).thenReturn(0L); 169 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 170 when(peerConfig.getReplicationEndpointImpl()) 171 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 172 when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 173 ReplicationSourceManager manager = mock(ReplicationSourceManager.class); 174 String queueId = "qid"; 175 RegionServerServices rss = 176 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 177 rs.init(conf, null, manager, null, mockPeer, rss, queueId, uuid, p -> OptionalLong.empty(), 178 new MetricsSource(queueId)); 179 try { 180 rs.startup(); 181 TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); 182 WALEntryFilter wef = rs.getWalEntryFilter(); 183 // Test non-system WAL edit. 184 WALEdit we = new WALEdit() 185 .add(CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW) 186 .setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build()); 187 WAL.Entry e = new WAL.Entry( 188 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf("test"), -1, -1, uuid), we); 189 assertTrue(wef.filter(e) == e); 190 // Test system WAL edit. 191 e = new WAL.Entry( 192 new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1, -1, uuid), we); 193 assertNull(wef.filter(e)); 194 } finally { 195 rs.terminate("Done"); 196 rss.stop("Done"); 197 } 198 } 199 200 /** 201 * Sanity check that we can move logs around while we are reading from them. Should this test 202 * fail, ReplicationSource would have a hard time reading logs that are being archived. 203 */ 204 // This tests doesn't belong in here... it is not about ReplicationSource. 205 @Test 206 public void testLogMoving() throws Exception { 207 Path logPath = new Path(logDir, "log"); 208 if (!FS.exists(logDir)) { 209 FS.mkdirs(logDir); 210 } 211 if (!FS.exists(oldLogDir)) { 212 FS.mkdirs(oldLogDir); 213 } 214 WALProvider.Writer writer = 215 WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); 216 for (int i = 0; i < 3; i++) { 217 byte[] b = Bytes.toBytes(Integer.toString(i)); 218 KeyValue kv = new KeyValue(b, b, b); 219 WALEdit edit = new WALEdit(); 220 edit.add(kv); 221 WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); 222 writer.append(new WAL.Entry(key, edit)); 223 writer.sync(false); 224 } 225 writer.close(); 226 227 WALStreamReader reader = 228 WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration()); 229 WAL.Entry entry = reader.next(); 230 assertNotNull(entry); 231 232 Path oldLogPath = new Path(oldLogDir, "log"); 233 FS.rename(logPath, oldLogPath); 234 235 entry = reader.next(); 236 assertNotNull(entry); 237 238 reader.next(); 239 entry = reader.next(); 240 241 assertNull(entry); 242 reader.close(); 243 } 244 245 /** 246 * Tests that {@link ReplicationSource#terminate(String)} will timeout properly Moved here from 247 * TestReplicationSource because doesn't need cluster. 248 */ 249 @Test 250 public void testTerminateTimeout() throws Exception { 251 ReplicationSource source = new ReplicationSource(); 252 ReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint(); 253 try { 254 replicationEndpoint.start(); 255 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 256 when(mockPeer.getPeerBandwidth()).thenReturn(0L); 257 Configuration testConf = HBaseConfiguration.create(); 258 testConf.setInt("replication.source.maxretriesmultiplier", 1); 259 ReplicationSourceManager manager = mock(ReplicationSourceManager.class); 260 source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, 261 p -> OptionalLong.empty(), null); 262 ExecutorService executor = Executors.newSingleThreadExecutor(); 263 Future<?> future = executor.submit(() -> source.terminate("testing source termination")); 264 long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000); 265 Waiter.waitFor(testConf, sleepForRetries * 2, (Waiter.Predicate<Exception>) future::isDone); 266 } finally { 267 replicationEndpoint.stop(); 268 } 269 } 270 271 @Test 272 public void testTerminateClearsBuffer() throws Exception { 273 ReplicationSource source = new ReplicationSource(); 274 ReplicationSourceManager mockManager = new ReplicationSourceManager(null, null, conf, null, 275 null, null, null, null, null, mock(MetricsReplicationGlobalSourceSource.class)); 276 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 277 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 278 Configuration testConf = HBaseConfiguration.create(); 279 source.init(testConf, null, mockManager, null, mockPeer, Mockito.mock(Server.class), "testPeer", 280 null, p -> OptionalLong.empty(), mock(MetricsSource.class)); 281 ReplicationSourceWALReader reader = 282 new ReplicationSourceWALReader(null, conf, null, 0, null, source, null); 283 ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source); 284 shipper.entryReader = reader; 285 source.workerThreads.put("testPeer", shipper); 286 WALEntryBatch batch = new WALEntryBatch(10, logDir); 287 WAL.Entry mockEntry = mock(WAL.Entry.class); 288 WALEdit mockEdit = mock(WALEdit.class); 289 WALKeyImpl mockKey = mock(WALKeyImpl.class); 290 when(mockEntry.getEdit()).thenReturn(mockEdit); 291 when(mockEdit.isEmpty()).thenReturn(false); 292 when(mockEntry.getKey()).thenReturn(mockKey); 293 when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); 294 when(mockEdit.heapSize()).thenReturn(10000L); 295 when(mockEdit.size()).thenReturn(0); 296 ArrayList<Cell> cells = new ArrayList<>(); 297 KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), Bytes.toBytes("1"), 298 Bytes.toBytes("v1")); 299 cells.add(kv); 300 when(mockEdit.getCells()).thenReturn(cells); 301 reader.addEntryToBatch(batch, mockEntry); 302 reader.entryBatchQueue.put(batch); 303 source.terminate("test"); 304 assertEquals(0, source.getSourceManager().getTotalBufferUsed()); 305 } 306 307 /** 308 * Tests that recovered queues are preserved on a regionserver shutdown. See HBASE-18192 309 */ 310 @Test 311 public void testServerShutdownRecoveredQueue() throws Exception { 312 try { 313 // Ensure single-threaded WAL 314 conf.set("hbase.wal.provider", "defaultProvider"); 315 conf.setInt("replication.sleep.before.failover", 2000); 316 // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. 317 conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); 318 MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); 319 TEST_UTIL_PEER.startMiniCluster(1); 320 321 HRegionServer serverA = cluster.getRegionServer(0); 322 final ReplicationSourceManager managerA = 323 serverA.getReplicationSourceService().getReplicationManager(); 324 HRegionServer serverB = cluster.getRegionServer(1); 325 final ReplicationSourceManager managerB = 326 serverB.getReplicationSourceService().getReplicationManager(); 327 final Admin admin = TEST_UTIL.getAdmin(); 328 329 final String peerId = "TestPeer"; 330 admin.addReplicationPeer(peerId, 331 ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build()); 332 // Wait for replication sources to come up 333 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 334 @Override 335 public boolean evaluate() { 336 return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); 337 } 338 }); 339 // Disabling peer makes sure there is at least one log to claim when the server dies 340 // The recovered queue will also stay there until the peer is disabled even if the 341 // WALs it contains have no data. 342 admin.disableReplicationPeer(peerId); 343 344 // Stopping serverA 345 // It's queues should be claimed by the only other alive server i.e. serverB 346 cluster.stopRegionServer(serverA.getServerName()); 347 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 348 @Override 349 public boolean evaluate() throws Exception { 350 return managerB.getOldSources().size() == 1; 351 } 352 }); 353 354 final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); 355 serverC.waitForServerOnline(); 356 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 357 @Override 358 public boolean evaluate() throws Exception { 359 return serverC.getReplicationSourceService() != null; 360 } 361 }); 362 final ReplicationSourceManager managerC = 363 ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); 364 // Sanity check 365 assertEquals(0, managerC.getOldSources().size()); 366 367 // Stopping serverB 368 // Now serverC should have two recovered queues: 369 // 1. The serverB's normal queue 370 // 2. serverA's recovered queue on serverB 371 cluster.stopRegionServer(serverB.getServerName()); 372 Waiter.waitFor(conf, 20000, 373 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 2); 374 admin.enableReplicationPeer(peerId); 375 Waiter.waitFor(conf, 20000, 376 (Waiter.Predicate<Exception>) () -> managerC.getOldSources().size() == 0); 377 } finally { 378 conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); 379 } 380 } 381 382 /** 383 * Regionserver implementation that adds a delay on the graceful shutdown. 384 */ 385 public static class ShutdownDelayRegionServer extends HRegionServer { 386 public ShutdownDelayRegionServer(Configuration conf) throws IOException { 387 super(conf); 388 } 389 390 @Override 391 protected void stopServiceThreads() { 392 // Add a delay before service threads are shutdown. 393 // This will keep the zookeeper connection alive for the duration of the delay. 394 LOG.info("Adding a delay to the regionserver shutdown"); 395 try { 396 Thread.sleep(2000); 397 } catch (InterruptedException ex) { 398 LOG.error("Interrupted while sleeping"); 399 } 400 super.stopServiceThreads(); 401 } 402 } 403 404 /** 405 * Deadend Endpoint. Does nothing. 406 */ 407 public static class DoNothingReplicationEndpoint extends HBaseInterClusterReplicationEndpoint { 408 private final UUID uuid = UUID.randomUUID(); 409 410 @Override 411 public void init(Context context) throws IOException { 412 this.ctx = context; 413 } 414 415 @Override 416 public WALEntryFilter getWALEntryfilter() { 417 return null; 418 } 419 420 @Override 421 public synchronized UUID getPeerUUID() { 422 return this.uuid; 423 } 424 425 @Override 426 protected void doStart() { 427 notifyStarted(); 428 } 429 430 @Override 431 protected void doStop() { 432 notifyStopped(); 433 } 434 435 @Override 436 public boolean canReplicateToSameCluster() { 437 return true; 438 } 439 } 440 441 /** 442 * Deadend Endpoint. Does nothing. 443 */ 444 public static class FlakyReplicationEndpoint extends DoNothingReplicationEndpoint { 445 446 static int count = 0; 447 448 @Override 449 public synchronized UUID getPeerUUID() { 450 if (count == 0) { 451 count++; 452 throw new RuntimeException(); 453 } else { 454 return super.getPeerUUID(); 455 } 456 } 457 458 } 459 460 /** 461 * Bad Endpoint with failing connection to peer on demand. 462 */ 463 public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint { 464 static boolean failing = true; 465 466 @Override 467 public synchronized UUID getPeerUUID() { 468 return failing ? null : super.getPeerUUID(); 469 } 470 } 471 472 public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint { 473 474 static int count = 0; 475 476 @Override 477 public synchronized UUID getPeerUUID() { 478 throw new RuntimeException(); 479 } 480 481 } 482 483 /** 484 * Test HBASE-20497 Moved here from TestReplicationSource because doesn't need cluster. 485 */ 486 @Test 487 public void testRecoveredReplicationSourceShipperGetPosition() throws Exception { 488 String walGroupId = "fake-wal-group-id"; 489 ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L); 490 ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); 491 RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); 492 Server server = mock(Server.class); 493 when(server.getServerName()).thenReturn(serverName); 494 when(source.getServer()).thenReturn(server); 495 when(source.getServerWALsBelongTo()).thenReturn(deadServer); 496 ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); 497 when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) 498 .thenReturn(1001L); 499 when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) 500 .thenReturn(-1L); 501 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 502 conf.setInt("replication.source.maxretriesmultiplier", -1); 503 MetricsSource metricsSource = mock(MetricsSource.class); 504 doNothing().when(metricsSource).incrSizeOfLogQueue(); 505 ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source); 506 logQueue.enqueueLog(new Path("/www/html/test"), walGroupId); 507 RecoveredReplicationSourceShipper shipper = 508 new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage); 509 assertEquals(1001L, shipper.getStartPosition()); 510 } 511 512 private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, 513 String endpointName) throws IOException { 514 conf.setInt("replication.source.maxretriesmultiplier", 1); 515 ReplicationPeer mockPeer = mock(ReplicationPeer.class); 516 when(mockPeer.getConfiguration()).thenReturn(conf); 517 when(mockPeer.getPeerBandwidth()).thenReturn(0L); 518 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 519 FaultyReplicationEndpoint.count = 0; 520 when(peerConfig.getReplicationEndpointImpl()).thenReturn(endpointName); 521 when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 522 ReplicationSourceManager manager = mock(ReplicationSourceManager.class); 523 Mockito.when(manager.getGlobalMetrics()) 524 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 525 String queueId = "qid"; 526 RegionServerServices rss = 527 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 528 rs.init(conf, null, manager, null, mockPeer, rss, queueId, null, p -> OptionalLong.empty(), 529 new MetricsSource(queueId)); 530 return rss; 531 } 532 533 /** 534 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 535 * and <b>eplication.source.regionserver.abort</b> is set to false. 536 */ 537 @Test 538 public void testAbortFalseOnError() throws IOException { 539 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 540 conf.setBoolean("replication.source.regionserver.abort", false); 541 ReplicationSource rs = new ReplicationSource(); 542 RegionServerServices rss = 543 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 544 try { 545 rs.startup(); 546 assertTrue(rs.isSourceActive()); 547 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 548 rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID)); 549 assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue()); 550 rs.enqueueLog(new Path("a.1")); 551 assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue()); 552 } finally { 553 rs.terminate("Done"); 554 rss.stop("Done"); 555 } 556 } 557 558 @Test 559 public void testReplicationSourceInitializingMetric() throws IOException { 560 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 561 conf.setBoolean("replication.source.regionserver.abort", false); 562 ReplicationSource rs = new ReplicationSource(); 563 RegionServerServices rss = setupForAbortTests(rs, conf, BadReplicationEndpoint.class.getName()); 564 try { 565 rs.startup(); 566 assertTrue(rs.isSourceActive()); 567 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 1); 568 BadReplicationEndpoint.failing = false; 569 Waiter.waitFor(conf, 10000, () -> rs.getSourceMetrics().getSourceInitializing() == 0); 570 } finally { 571 rs.terminate("Done"); 572 rss.stop("Done"); 573 } 574 } 575 576 /** 577 * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread, 578 * when <b>replication.source.regionserver.abort</b> is set to false. 579 */ 580 @Test 581 public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException { 582 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 583 ReplicationSource rs = new ReplicationSource(); 584 RegionServerServices rss = 585 setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName()); 586 try { 587 rs.startup(); 588 assertTrue(true); 589 } finally { 590 rs.terminate("Done"); 591 rss.stop("Done"); 592 } 593 } 594 595 /** 596 * Test ReplicationSource retries startup once an uncaught exception happens during initialization 597 * and <b>replication.source.regionserver.abort</b> is set to true. 598 */ 599 @Test 600 public void testAbortTrueOnError() throws IOException { 601 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 602 ReplicationSource rs = new ReplicationSource(); 603 RegionServerServices rss = 604 setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName()); 605 try { 606 rs.startup(); 607 assertTrue(rs.isSourceActive()); 608 Waiter.waitFor(conf, 1000, () -> rss.isAborted()); 609 assertTrue(rss.isAborted()); 610 Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive()); 611 assertFalse(rs.isSourceActive()); 612 } finally { 613 rs.terminate("Done"); 614 rss.stop("Done"); 615 } 616 } 617 618 /* 619 * Test age of oldest wal metric. 620 */ 621 @Test 622 public void testAgeOfOldestWal() throws Exception { 623 try { 624 ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge(); 625 EnvironmentEdgeManager.injectEdge(manualEdge); 626 627 String id = "1"; 628 MetricsSource metrics = new MetricsSource(id); 629 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 630 conf.setInt("replication.source.maxretriesmultiplier", 1); 631 ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class); 632 Mockito.when(mockPeer.getConfiguration()).thenReturn(conf); 633 Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); 634 ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class); 635 Mockito.when(peerConfig.getReplicationEndpointImpl()) 636 .thenReturn(DoNothingReplicationEndpoint.class.getName()); 637 Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); 638 ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); 639 Mockito.when(manager.getGlobalMetrics()) 640 .thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); 641 RegionServerServices rss = 642 TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); 643 644 ReplicationSource source = new ReplicationSource(); 645 source.init(conf, null, manager, null, mockPeer, rss, id, null, p -> OptionalLong.empty(), 646 metrics); 647 648 final Path log1 = new Path(logDir, "log-walgroup-a.8"); 649 manualEdge.setValue(10); 650 // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2. 651 source.enqueueLog(log1); 652 MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id); 653 assertEquals(2, metricsSource1.getOldestWalAge()); 654 655 final Path log2 = new Path(logDir, "log-walgroup-b.4"); 656 // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6 657 source.enqueueLog(log2); 658 assertEquals(6, metricsSource1.getOldestWalAge()); 659 // Clear all metrics. 660 metrics.clear(); 661 } finally { 662 EnvironmentEdgeManager.reset(); 663 } 664 } 665 666 private MetricsReplicationSourceSource getSourceMetrics(String sourceId) { 667 MetricsReplicationSourceFactory factory = 668 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class); 669 return factory.getSource(sourceId); 670 } 671}