001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.mockito.Mockito.doNothing; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.spy; 023import static org.mockito.Mockito.verify; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.UUID; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicReference; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; 050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; 051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; 052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; 053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; 054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource; 055import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; 056import org.apache.hadoop.hbase.testclassification.MediumTests; 057import org.apache.hadoop.hbase.testclassification.ReplicationTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.hadoop.hbase.util.Threads; 063import org.apache.hadoop.hbase.wal.WAL.Entry; 064import org.apache.hadoop.hbase.wal.WALEdit; 065import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 066import org.apache.hadoop.hbase.wal.WALKeyImpl; 067import org.apache.hadoop.hbase.zookeeper.ZKConfig; 068import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; 069import org.junit.AfterClass; 070import org.junit.Assert; 071import org.junit.Before; 072import org.junit.BeforeClass; 073import org.junit.ClassRule; 074import org.junit.Test; 075import org.junit.experimental.categories.Category; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079/** 080 * Tests ReplicationSource and ReplicationEndpoint interactions 081 */ 082@Category({ ReplicationTests.class, MediumTests.class }) 083public class TestReplicationEndpoint extends TestReplicationBase { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestReplicationEndpoint.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class); 090 091 static int numRegionServers; 092 093 @BeforeClass 094 public static void setUpBeforeClass() throws Exception { 095 TestReplicationBase.setUpBeforeClass(); 096 numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size(); 097 } 098 099 @AfterClass 100 public static void tearDownAfterClass() throws Exception { 101 TestReplicationBase.tearDownAfterClass(); 102 // check stop is called 103 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0); 104 } 105 106 @Before 107 public void setup() throws Exception { 108 ReplicationEndpointForTest.contructedCount.set(0); 109 ReplicationEndpointForTest.startedCount.set(0); 110 ReplicationEndpointForTest.replicateCount.set(0); 111 ReplicationEndpointReturningFalse.replicated.set(false); 112 ReplicationEndpointForTest.lastEntries = null; 113 final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads(); 114 for (RegionServerThread rs : rsThreads) { 115 UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName()); 116 } 117 // Wait for all log roll to finish 118 UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() { 119 @Override 120 public boolean evaluate() throws Exception { 121 for (RegionServerThread rs : rsThreads) { 122 if (!rs.getRegionServer().walRollRequestFinished()) { 123 return false; 124 } 125 } 126 return true; 127 } 128 129 @Override 130 public String explainFailure() throws Exception { 131 List<String> logRollInProgressRsList = new ArrayList<>(); 132 for (RegionServerThread rs : rsThreads) { 133 if (!rs.getRegionServer().walRollRequestFinished()) { 134 logRollInProgressRsList.add(rs.getRegionServer().toString()); 135 } 136 } 137 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList; 138 } 139 }); 140 } 141 142 @Test 143 public void testCustomReplicationEndpoint() throws Exception { 144 // test installing a custom replication endpoint other than the default one. 145 hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint", 146 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 147 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build()); 148 149 // check whether the class has been constructed and started 150 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 151 @Override 152 public boolean evaluate() throws Exception { 153 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers; 154 } 155 }); 156 157 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 158 @Override 159 public boolean evaluate() throws Exception { 160 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers; 161 } 162 }); 163 164 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 165 166 // now replicate some data. 167 doPut(Bytes.toBytes("row42")); 168 169 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 170 @Override 171 public boolean evaluate() throws Exception { 172 return ReplicationEndpointForTest.replicateCount.get() >= 1; 173 } 174 }); 175 176 doAssert(Bytes.toBytes("row42")); 177 178 hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint"); 179 } 180 181 @Test 182 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception { 183 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get()); 184 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); 185 int peerCount = hbaseAdmin.listReplicationPeers().size(); 186 final String id = "testReplicationEndpointReturnsFalseOnReplicate"; 187 hbaseAdmin.addReplicationPeer(id, 188 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 189 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()).build()); 190 // This test is flakey and then there is so much stuff flying around in here its, hard to 191 // debug. Peer needs to be up for the edit to make it across. This wait on 192 // peer count seems to be a hack that has us not progress till peer is up. 193 if (hbaseAdmin.listReplicationPeers().size() <= peerCount) { 194 LOG.info("Waiting on peercount to go up from " + peerCount); 195 Threads.sleep(100); 196 } 197 // now replicate some data 198 doPut(row); 199 200 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 201 @Override 202 public boolean evaluate() throws Exception { 203 // Looks like replication endpoint returns false unless we put more than 10 edits. We 204 // only send over one edit. 205 int count = ReplicationEndpointForTest.replicateCount.get(); 206 LOG.info("count=" + count); 207 return ReplicationEndpointReturningFalse.replicated.get(); 208 } 209 }); 210 if (ReplicationEndpointReturningFalse.ex.get() != null) { 211 throw ReplicationEndpointReturningFalse.ex.get(); 212 } 213 214 hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate"); 215 } 216 217 @Test 218 public void testInterClusterReplication() throws Exception { 219 final String id = "testInterClusterReplication"; 220 221 List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName); 222 int totEdits = 0; 223 224 // Make sure edits are spread across regions because we do region based batching 225 // before shipping edits. 226 for (HRegion region : regions) { 227 RegionInfo hri = region.getRegionInfo(); 228 byte[] row = hri.getStartKey(); 229 for (int i = 0; i < 100; i++) { 230 if (row.length > 0) { 231 Put put = new Put(row); 232 put.addColumn(famName, row, row); 233 region.put(put); 234 totEdits++; 235 } 236 } 237 } 238 239 hbaseAdmin.addReplicationPeer(id, 240 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2)) 241 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()) 242 .build()); 243 244 final int numEdits = totEdits; 245 Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() { 246 @Override 247 public boolean evaluate() throws Exception { 248 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits; 249 } 250 251 @Override 252 public String explainFailure() throws Exception { 253 String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = " 254 + InterClusterReplicationEndpointForTest.replicateCount.get(); 255 return failure; 256 } 257 }); 258 259 hbaseAdmin.removeReplicationPeer("testInterClusterReplication"); 260 UTIL1.deleteTableData(tableName); 261 } 262 263 @Test 264 public void testWALEntryFilterFromReplicationEndpoint() throws Exception { 265 ReplicationPeerConfig rpc = 266 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 267 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 268 // test that we can create mutliple WALFilters reflectively 269 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 270 EverythingPassesWALEntryFilter.class.getName() + "," 271 + EverythingPassesWALEntryFilterSubclass.class.getName()) 272 .build(); 273 274 hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc); 275 // now replicate some data. 276 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 277 doPut(connection, Bytes.toBytes("row1")); 278 doPut(connection, row); 279 doPut(connection, Bytes.toBytes("row2")); 280 } 281 282 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() { 283 @Override 284 public boolean evaluate() throws Exception { 285 return ReplicationEndpointForTest.replicateCount.get() >= 1; 286 } 287 }); 288 289 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); 290 // make sure our reflectively created filter is in the filter chain 291 Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); 292 hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint"); 293 } 294 295 @Test(expected = IOException.class) 296 public void testWALEntryFilterAddValidation() throws Exception { 297 ReplicationPeerConfig rpc = 298 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 299 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 300 // test that we can create mutliple WALFilters reflectively 301 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 302 "IAmNotARealWalEntryFilter") 303 .build(); 304 hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc); 305 } 306 307 @Test(expected = IOException.class) 308 public void testWALEntryFilterUpdateValidation() throws Exception { 309 ReplicationPeerConfig rpc = 310 ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1)) 311 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()) 312 // test that we can create mutliple WALFilters reflectively 313 .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, 314 "IAmNotARealWalEntryFilter") 315 .build(); 316 hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc); 317 } 318 319 @Test 320 public void testMetricsSourceBaseSourcePassThrough() { 321 /* 322 * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl, 323 * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that 324 * metrics get written to both namespaces. Both of those classes wrap a 325 * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics. 326 * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls 327 * down through the two layers of wrapping to the actual BaseSource. 328 */ 329 String id = "id"; 330 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); 331 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); 332 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); 333 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); 334 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); 335 336 MetricsReplicationSourceSource singleSourceSource = 337 new MetricsReplicationSourceSourceImpl(singleRms, id); 338 MetricsReplicationGlobalSourceSource globalSourceSource = 339 new MetricsReplicationGlobalSourceSourceImpl(globalRms); 340 MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); 341 doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); 342 343 Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>(); 344 MetricsSource source = 345 new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable); 346 347 String gaugeName = "gauge"; 348 String singleGaugeName = "source.id." + gaugeName; 349 String globalGaugeName = "source." + gaugeName; 350 long delta = 1; 351 String counterName = "counter"; 352 String singleCounterName = "source.id." + counterName; 353 String globalCounterName = "source." + counterName; 354 long count = 2; 355 source.decGauge(gaugeName, delta); 356 source.getMetricsContext(); 357 source.getMetricsDescription(); 358 source.getMetricsJmxContext(); 359 source.getMetricsName(); 360 source.incCounters(counterName, count); 361 source.incGauge(gaugeName, delta); 362 source.init(); 363 source.removeMetric(gaugeName); 364 source.setGauge(gaugeName, delta); 365 source.updateHistogram(counterName, count); 366 source.incrFailedRecoveryQueue(); 367 368 verify(singleRms).decGauge(singleGaugeName, delta); 369 verify(globalRms).decGauge(globalGaugeName, delta); 370 verify(globalRms).getMetricsContext(); 371 verify(globalRms).getMetricsJmxContext(); 372 verify(globalRms).getMetricsName(); 373 verify(singleRms).incCounters(singleCounterName, count); 374 verify(globalRms).incCounters(globalCounterName, count); 375 verify(singleRms).incGauge(singleGaugeName, delta); 376 verify(globalRms).incGauge(globalGaugeName, delta); 377 verify(globalRms).init(); 378 verify(singleRms).removeMetric(singleGaugeName); 379 verify(globalRms).removeMetric(globalGaugeName); 380 verify(singleRms).setGauge(singleGaugeName, delta); 381 verify(globalRms).setGauge(globalGaugeName, delta); 382 verify(singleRms).updateHistogram(singleCounterName, count); 383 verify(globalRms).updateHistogram(globalCounterName, count); 384 verify(spyglobalSourceSource).incrFailedRecoveryQueue(); 385 386 // check singleSourceSourceByTable metrics. 387 // singleSourceSourceByTable map entry will be created only 388 // after calling #setAgeOfLastShippedOpByTable 389 boolean containsRandomNewTable = 390 source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); 391 Assert.assertEquals(false, containsRandomNewTable); 392 source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable")); 393 containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable"); 394 Assert.assertEquals(true, containsRandomNewTable); 395 MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable"); 396 397 // age should be greater than zero we created the entry with time in the past 398 Assert.assertTrue(msr.getLastShippedAge() > 0); 399 Assert.assertTrue(msr.getShippedBytes() > 0); 400 401 } 402 403 private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) { 404 List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>(); 405 byte[] a = new byte[] { 'a' }; 406 Entry entry = createEntry(tableName, null, a); 407 walEntriesWithSize.add(new Pair<>(entry, 10L)); 408 return walEntriesWithSize; 409 } 410 411 private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) { 412 WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName), 413 EnvironmentEdgeManager.currentTime() - 1L, scopes); 414 WALEdit edit1 = new WALEdit(); 415 416 for (byte[] kv : kvs) { 417 WALEditInternalHelper.addExtendedCell(edit1, new KeyValue(kv, kv, kv)); 418 } 419 return new Entry(key1, edit1); 420 } 421 422 private void doPut(byte[] row) throws IOException { 423 try (Connection connection = ConnectionFactory.createConnection(CONF1)) { 424 doPut(connection, row); 425 } 426 } 427 428 private void doPut(final Connection connection, final byte[] row) throws IOException { 429 try (Table t = connection.getTable(tableName)) { 430 Put put = new Put(row); 431 put.addColumn(famName, row, row); 432 t.put(put); 433 } 434 } 435 436 private static void doAssert(byte[] row) throws Exception { 437 if (ReplicationEndpointForTest.lastEntries == null) { 438 return; // first call 439 } 440 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size()); 441 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells(); 442 Assert.assertEquals(1, cells.size()); 443 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(), 444 cells.get(0).getRowLength(), row, 0, row.length)); 445 } 446 447 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint { 448 static UUID uuid = UTIL1.getRandomUUID(); 449 static AtomicInteger contructedCount = new AtomicInteger(); 450 static AtomicInteger startedCount = new AtomicInteger(); 451 static AtomicInteger stoppedCount = new AtomicInteger(); 452 static AtomicInteger replicateCount = new AtomicInteger(); 453 static volatile List<Entry> lastEntries = null; 454 455 public ReplicationEndpointForTest() { 456 replicateCount.set(0); 457 contructedCount.incrementAndGet(); 458 } 459 460 @Override 461 public UUID getPeerUUID() { 462 return uuid; 463 } 464 465 @Override 466 public boolean replicate(ReplicateContext replicateContext) { 467 replicateCount.incrementAndGet(); 468 lastEntries = new ArrayList<>(replicateContext.entries); 469 return true; 470 } 471 472 @Override 473 public void start() { 474 startAsync(); 475 } 476 477 @Override 478 public void stop() { 479 stopAsync(); 480 } 481 482 @Override 483 protected void doStart() { 484 startedCount.incrementAndGet(); 485 notifyStarted(); 486 } 487 488 @Override 489 protected void doStop() { 490 stoppedCount.incrementAndGet(); 491 notifyStopped(); 492 } 493 494 @Override 495 public boolean canReplicateToSameCluster() { 496 return true; 497 } 498 } 499 500 /** 501 * Not used by unit tests, helpful for manual testing with replication. 502 * <p> 503 * Snippet for `hbase shell`: 504 * 505 * <pre> 506 * create 't', 'f' 507 * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \ 508 * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest' 509 * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1} 510 * </pre> 511 */ 512 public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { 513 private long duration; 514 515 public SleepingReplicationEndpointForTest() { 516 super(); 517 } 518 519 @Override 520 public void init(Context context) throws IOException { 521 super.init(context); 522 if (this.ctx != null) { 523 duration = this.ctx.getConfiguration() 524 .getLong("hbase.test.sleep.replication.endpoint.duration.millis", 5000L); 525 } 526 } 527 528 @Override 529 public boolean replicate(ReplicateContext context) { 530 try { 531 Thread.sleep(duration); 532 } catch (InterruptedException e) { 533 Thread.currentThread().interrupt(); 534 return false; 535 } 536 return super.replicate(context); 537 } 538 } 539 540 public static class InterClusterReplicationEndpointForTest 541 extends HBaseInterClusterReplicationEndpoint { 542 543 static AtomicInteger replicateCount = new AtomicInteger(); 544 static boolean failedOnce; 545 546 public InterClusterReplicationEndpointForTest() { 547 replicateCount.set(0); 548 } 549 550 @Override 551 public boolean replicate(ReplicateContext replicateContext) { 552 boolean success = super.replicate(replicateContext); 553 if (success) { 554 replicateCount.addAndGet(replicateContext.entries.size()); 555 } 556 return success; 557 } 558 559 @Override 560 protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, 561 int timeout) { 562 // Fail only once, we don't want to slow down the test. 563 if (failedOnce) { 564 return CompletableFuture.completedFuture(ordinal); 565 } else { 566 failedOnce = true; 567 CompletableFuture<Integer> future = new CompletableFuture<Integer>(); 568 future.completeExceptionally(new IOException("Sample Exception: Failed to replicate.")); 569 return future; 570 } 571 } 572 } 573 574 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest { 575 static int COUNT = 10; 576 static AtomicReference<Exception> ex = new AtomicReference<>(null); 577 static AtomicBoolean replicated = new AtomicBoolean(false); 578 579 @Override 580 public boolean replicate(ReplicateContext replicateContext) { 581 try { 582 // check row 583 doAssert(row); 584 } catch (Exception e) { 585 ex.set(e); 586 } 587 588 super.replicate(replicateContext); 589 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get()); 590 591 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false 592 return replicated.get(); 593 } 594 } 595 596 // return a WALEntry filter which only accepts "row", but not other rows 597 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest { 598 static AtomicReference<Exception> ex = new AtomicReference<>(null); 599 600 @Override 601 public boolean replicate(ReplicateContext replicateContext) { 602 try { 603 super.replicate(replicateContext); 604 doAssert(row); 605 } catch (Exception e) { 606 ex.set(e); 607 } 608 return true; 609 } 610 611 @Override 612 public WALEntryFilter getWALEntryfilter() { 613 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { 614 @Override 615 public Entry filter(Entry entry) { 616 ArrayList<Cell> cells = entry.getEdit().getCells(); 617 int size = cells.size(); 618 for (int i = size - 1; i >= 0; i--) { 619 Cell cell = cells.get(i); 620 if ( 621 !Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0, 622 row.length) 623 ) { 624 cells.remove(i); 625 } 626 } 627 return entry; 628 } 629 }); 630 } 631 } 632 633 public static class EverythingPassesWALEntryFilter implements WALEntryFilter { 634 private static boolean passedEntry = false; 635 636 @Override 637 public Entry filter(Entry entry) { 638 passedEntry = true; 639 return entry; 640 } 641 642 public static boolean hasPassedAnEntry() { 643 return passedEntry; 644 } 645 } 646 647 public static class EverythingPassesWALEntryFilterSubclass 648 extends EverythingPassesWALEntryFilter { 649 } 650}