001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.mockito.Mockito.doNothing;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.spy;
023import static org.mockito.Mockito.verify;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.UUID;
033import java.util.concurrent.CompletableFuture;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
049import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
050import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
051import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
052import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
053import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
054import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
055import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.testclassification.ReplicationTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.util.Threads;
063import org.apache.hadoop.hbase.wal.WAL.Entry;
064import org.apache.hadoop.hbase.wal.WALEdit;
065import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
066import org.apache.hadoop.hbase.wal.WALKeyImpl;
067import org.apache.hadoop.hbase.zookeeper.ZKConfig;
068import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
069import org.junit.AfterClass;
070import org.junit.Assert;
071import org.junit.Before;
072import org.junit.BeforeClass;
073import org.junit.ClassRule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079/**
080 * Tests ReplicationSource and ReplicationEndpoint interactions
081 */
082@Category({ ReplicationTests.class, MediumTests.class })
083public class TestReplicationEndpoint extends TestReplicationBase {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087    HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
090
091  static int numRegionServers;
092
093  @BeforeClass
094  public static void setUpBeforeClass() throws Exception {
095    TestReplicationBase.setUpBeforeClass();
096    numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
097  }
098
099  @AfterClass
100  public static void tearDownAfterClass() throws Exception {
101    TestReplicationBase.tearDownAfterClass();
102    // check stop is called
103    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
104  }
105
106  @Before
107  public void setup() throws Exception {
108    ReplicationEndpointForTest.contructedCount.set(0);
109    ReplicationEndpointForTest.startedCount.set(0);
110    ReplicationEndpointForTest.replicateCount.set(0);
111    ReplicationEndpointReturningFalse.replicated.set(false);
112    ReplicationEndpointForTest.lastEntries = null;
113    final List<RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads();
114    for (RegionServerThread rs : rsThreads) {
115      UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
116    }
117    // Wait for all log roll to finish
118    UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
119      @Override
120      public boolean evaluate() throws Exception {
121        for (RegionServerThread rs : rsThreads) {
122          if (!rs.getRegionServer().walRollRequestFinished()) {
123            return false;
124          }
125        }
126        return true;
127      }
128
129      @Override
130      public String explainFailure() throws Exception {
131        List<String> logRollInProgressRsList = new ArrayList<>();
132        for (RegionServerThread rs : rsThreads) {
133          if (!rs.getRegionServer().walRollRequestFinished()) {
134            logRollInProgressRsList.add(rs.getRegionServer().toString());
135          }
136        }
137        return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
138      }
139    });
140  }
141
142  @Test
143  public void testCustomReplicationEndpoint() throws Exception {
144    // test installing a custom replication endpoint other than the default one.
145    hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
146      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
147        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build());
148
149    // check whether the class has been constructed and started
150    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
151      @Override
152      public boolean evaluate() throws Exception {
153        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
154      }
155    });
156
157    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
158      @Override
159      public boolean evaluate() throws Exception {
160        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
161      }
162    });
163
164    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
165
166    // now replicate some data.
167    doPut(Bytes.toBytes("row42"));
168
169    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
170      @Override
171      public boolean evaluate() throws Exception {
172        return ReplicationEndpointForTest.replicateCount.get() >= 1;
173      }
174    });
175
176    doAssert(Bytes.toBytes("row42"));
177
178    hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
179  }
180
181  @Test
182  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
183    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
184    Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
185    int peerCount = hbaseAdmin.listReplicationPeers().size();
186    final String id = "testReplicationEndpointReturnsFalseOnReplicate";
187    hbaseAdmin.addReplicationPeer(id,
188      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
189        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()).build());
190    // This test is flakey and then there is so much stuff flying around in here its, hard to
191    // debug. Peer needs to be up for the edit to make it across. This wait on
192    // peer count seems to be a hack that has us not progress till peer is up.
193    if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
194      LOG.info("Waiting on peercount to go up from " + peerCount);
195      Threads.sleep(100);
196    }
197    // now replicate some data
198    doPut(row);
199
200    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
201      @Override
202      public boolean evaluate() throws Exception {
203        // Looks like replication endpoint returns false unless we put more than 10 edits. We
204        // only send over one edit.
205        int count = ReplicationEndpointForTest.replicateCount.get();
206        LOG.info("count=" + count);
207        return ReplicationEndpointReturningFalse.replicated.get();
208      }
209    });
210    if (ReplicationEndpointReturningFalse.ex.get() != null) {
211      throw ReplicationEndpointReturningFalse.ex.get();
212    }
213
214    hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
215  }
216
217  @Test
218  public void testInterClusterReplication() throws Exception {
219    final String id = "testInterClusterReplication";
220
221    List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
222    int totEdits = 0;
223
224    // Make sure edits are spread across regions because we do region based batching
225    // before shipping edits.
226    for (HRegion region : regions) {
227      RegionInfo hri = region.getRegionInfo();
228      byte[] row = hri.getStartKey();
229      for (int i = 0; i < 100; i++) {
230        if (row.length > 0) {
231          Put put = new Put(row);
232          put.addColumn(famName, row, row);
233          region.put(put);
234          totEdits++;
235        }
236      }
237    }
238
239    hbaseAdmin.addReplicationPeer(id,
240      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
241        .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName())
242        .build());
243
244    final int numEdits = totEdits;
245    Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
246      @Override
247      public boolean evaluate() throws Exception {
248        return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
249      }
250
251      @Override
252      public String explainFailure() throws Exception {
253        String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = "
254          + InterClusterReplicationEndpointForTest.replicateCount.get();
255        return failure;
256      }
257    });
258
259    hbaseAdmin.removeReplicationPeer("testInterClusterReplication");
260    UTIL1.deleteTableData(tableName);
261  }
262
263  @Test
264  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
265    ReplicationPeerConfig rpc =
266      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
267        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
268        // test that we can create mutliple WALFilters reflectively
269        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
270          EverythingPassesWALEntryFilter.class.getName() + ","
271            + EverythingPassesWALEntryFilterSubclass.class.getName())
272        .build();
273
274    hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
275    // now replicate some data.
276    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
277      doPut(connection, Bytes.toBytes("row1"));
278      doPut(connection, row);
279      doPut(connection, Bytes.toBytes("row2"));
280    }
281
282    Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
283      @Override
284      public boolean evaluate() throws Exception {
285        return ReplicationEndpointForTest.replicateCount.get() >= 1;
286      }
287    });
288
289    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
290    // make sure our reflectively created filter is in the filter chain
291    Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
292    hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
293  }
294
295  @Test(expected = IOException.class)
296  public void testWALEntryFilterAddValidation() throws Exception {
297    ReplicationPeerConfig rpc =
298      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
299        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
300        // test that we can create mutliple WALFilters reflectively
301        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
302          "IAmNotARealWalEntryFilter")
303        .build();
304    hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc);
305  }
306
307  @Test(expected = IOException.class)
308  public void testWALEntryFilterUpdateValidation() throws Exception {
309    ReplicationPeerConfig rpc =
310      ReplicationPeerConfig.newBuilder().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
311        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName())
312        // test that we can create mutliple WALFilters reflectively
313        .putConfiguration(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
314          "IAmNotARealWalEntryFilter")
315        .build();
316    hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc);
317  }
318
319  @Test
320  public void testMetricsSourceBaseSourcePassThrough() {
321    /*
322     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
323     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource, so that
324     * metrics get written to both namespaces. Both of those classes wrap a
325     * MetricsReplicationSourceImpl that implements BaseSource, which allows for custom JMX metrics.
326     * This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls
327     * down through the two layers of wrapping to the actual BaseSource.
328     */
329    String id = "id";
330    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
331    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
332    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
333    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
334    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
335
336    MetricsReplicationSourceSource singleSourceSource =
337      new MetricsReplicationSourceSourceImpl(singleRms, id);
338    MetricsReplicationGlobalSourceSource globalSourceSource =
339      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
340    MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
341    doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
342
343    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable = new HashMap<>();
344    MetricsSource source =
345      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
346
347    String gaugeName = "gauge";
348    String singleGaugeName = "source.id." + gaugeName;
349    String globalGaugeName = "source." + gaugeName;
350    long delta = 1;
351    String counterName = "counter";
352    String singleCounterName = "source.id." + counterName;
353    String globalCounterName = "source." + counterName;
354    long count = 2;
355    source.decGauge(gaugeName, delta);
356    source.getMetricsContext();
357    source.getMetricsDescription();
358    source.getMetricsJmxContext();
359    source.getMetricsName();
360    source.incCounters(counterName, count);
361    source.incGauge(gaugeName, delta);
362    source.init();
363    source.removeMetric(gaugeName);
364    source.setGauge(gaugeName, delta);
365    source.updateHistogram(counterName, count);
366    source.incrFailedRecoveryQueue();
367
368    verify(singleRms).decGauge(singleGaugeName, delta);
369    verify(globalRms).decGauge(globalGaugeName, delta);
370    verify(globalRms).getMetricsContext();
371    verify(globalRms).getMetricsJmxContext();
372    verify(globalRms).getMetricsName();
373    verify(singleRms).incCounters(singleCounterName, count);
374    verify(globalRms).incCounters(globalCounterName, count);
375    verify(singleRms).incGauge(singleGaugeName, delta);
376    verify(globalRms).incGauge(globalGaugeName, delta);
377    verify(globalRms).init();
378    verify(singleRms).removeMetric(singleGaugeName);
379    verify(globalRms).removeMetric(globalGaugeName);
380    verify(singleRms).setGauge(singleGaugeName, delta);
381    verify(globalRms).setGauge(globalGaugeName, delta);
382    verify(singleRms).updateHistogram(singleCounterName, count);
383    verify(globalRms).updateHistogram(globalCounterName, count);
384    verify(spyglobalSourceSource).incrFailedRecoveryQueue();
385
386    // check singleSourceSourceByTable metrics.
387    // singleSourceSourceByTable map entry will be created only
388    // after calling #setAgeOfLastShippedOpByTable
389    boolean containsRandomNewTable =
390      source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
391    Assert.assertEquals(false, containsRandomNewTable);
392    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
393    containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
394    Assert.assertEquals(true, containsRandomNewTable);
395    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable().get("RandomNewTable");
396
397    // age should be greater than zero we created the entry with time in the past
398    Assert.assertTrue(msr.getLastShippedAge() > 0);
399    Assert.assertTrue(msr.getShippedBytes() > 0);
400
401  }
402
403  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
404    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
405    byte[] a = new byte[] { 'a' };
406    Entry entry = createEntry(tableName, null, a);
407    walEntriesWithSize.add(new Pair<>(entry, 10L));
408    return walEntriesWithSize;
409  }
410
411  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
412    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
413      EnvironmentEdgeManager.currentTime() - 1L, scopes);
414    WALEdit edit1 = new WALEdit();
415
416    for (byte[] kv : kvs) {
417      WALEditInternalHelper.addExtendedCell(edit1, new KeyValue(kv, kv, kv));
418    }
419    return new Entry(key1, edit1);
420  }
421
422  private void doPut(byte[] row) throws IOException {
423    try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
424      doPut(connection, row);
425    }
426  }
427
428  private void doPut(final Connection connection, final byte[] row) throws IOException {
429    try (Table t = connection.getTable(tableName)) {
430      Put put = new Put(row);
431      put.addColumn(famName, row, row);
432      t.put(put);
433    }
434  }
435
436  private static void doAssert(byte[] row) throws Exception {
437    if (ReplicationEndpointForTest.lastEntries == null) {
438      return; // first call
439    }
440    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
441    List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
442    Assert.assertEquals(1, cells.size());
443    Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
444      cells.get(0).getRowLength(), row, 0, row.length));
445  }
446
447  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
448    static UUID uuid = UTIL1.getRandomUUID();
449    static AtomicInteger contructedCount = new AtomicInteger();
450    static AtomicInteger startedCount = new AtomicInteger();
451    static AtomicInteger stoppedCount = new AtomicInteger();
452    static AtomicInteger replicateCount = new AtomicInteger();
453    static volatile List<Entry> lastEntries = null;
454
455    public ReplicationEndpointForTest() {
456      replicateCount.set(0);
457      contructedCount.incrementAndGet();
458    }
459
460    @Override
461    public UUID getPeerUUID() {
462      return uuid;
463    }
464
465    @Override
466    public boolean replicate(ReplicateContext replicateContext) {
467      replicateCount.incrementAndGet();
468      lastEntries = new ArrayList<>(replicateContext.entries);
469      return true;
470    }
471
472    @Override
473    public void start() {
474      startAsync();
475    }
476
477    @Override
478    public void stop() {
479      stopAsync();
480    }
481
482    @Override
483    protected void doStart() {
484      startedCount.incrementAndGet();
485      notifyStarted();
486    }
487
488    @Override
489    protected void doStop() {
490      stoppedCount.incrementAndGet();
491      notifyStopped();
492    }
493
494    @Override
495    public boolean canReplicateToSameCluster() {
496      return true;
497    }
498  }
499
500  /**
501   * Not used by unit tests, helpful for manual testing with replication.
502   * <p>
503   * Snippet for `hbase shell`:
504   *
505   * <pre>
506   * create 't', 'f'
507   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
508   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
509   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
510   * </pre>
511   */
512  public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
513    private long duration;
514
515    public SleepingReplicationEndpointForTest() {
516      super();
517    }
518
519    @Override
520    public void init(Context context) throws IOException {
521      super.init(context);
522      if (this.ctx != null) {
523        duration = this.ctx.getConfiguration()
524          .getLong("hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
525      }
526    }
527
528    @Override
529    public boolean replicate(ReplicateContext context) {
530      try {
531        Thread.sleep(duration);
532      } catch (InterruptedException e) {
533        Thread.currentThread().interrupt();
534        return false;
535      }
536      return super.replicate(context);
537    }
538  }
539
540  public static class InterClusterReplicationEndpointForTest
541    extends HBaseInterClusterReplicationEndpoint {
542
543    static AtomicInteger replicateCount = new AtomicInteger();
544    static boolean failedOnce;
545
546    public InterClusterReplicationEndpointForTest() {
547      replicateCount.set(0);
548    }
549
550    @Override
551    public boolean replicate(ReplicateContext replicateContext) {
552      boolean success = super.replicate(replicateContext);
553      if (success) {
554        replicateCount.addAndGet(replicateContext.entries.size());
555      }
556      return success;
557    }
558
559    @Override
560    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
561      int timeout) {
562      // Fail only once, we don't want to slow down the test.
563      if (failedOnce) {
564        return CompletableFuture.completedFuture(ordinal);
565      } else {
566        failedOnce = true;
567        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
568        future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
569        return future;
570      }
571    }
572  }
573
574  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
575    static int COUNT = 10;
576    static AtomicReference<Exception> ex = new AtomicReference<>(null);
577    static AtomicBoolean replicated = new AtomicBoolean(false);
578
579    @Override
580    public boolean replicate(ReplicateContext replicateContext) {
581      try {
582        // check row
583        doAssert(row);
584      } catch (Exception e) {
585        ex.set(e);
586      }
587
588      super.replicate(replicateContext);
589      LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
590
591      replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
592      return replicated.get();
593    }
594  }
595
596  // return a WALEntry filter which only accepts "row", but not other rows
597  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
598    static AtomicReference<Exception> ex = new AtomicReference<>(null);
599
600    @Override
601    public boolean replicate(ReplicateContext replicateContext) {
602      try {
603        super.replicate(replicateContext);
604        doAssert(row);
605      } catch (Exception e) {
606        ex.set(e);
607      }
608      return true;
609    }
610
611    @Override
612    public WALEntryFilter getWALEntryfilter() {
613      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
614        @Override
615        public Entry filter(Entry entry) {
616          ArrayList<Cell> cells = entry.getEdit().getCells();
617          int size = cells.size();
618          for (int i = size - 1; i >= 0; i--) {
619            Cell cell = cells.get(i);
620            if (
621              !Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), row, 0,
622                row.length)
623            ) {
624              cells.remove(i);
625            }
626          }
627          return entry;
628        }
629      });
630    }
631  }
632
633  public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
634    private static boolean passedEntry = false;
635
636    @Override
637    public Entry filter(Entry entry) {
638      passedEntry = true;
639      return entry;
640    }
641
642    public static boolean hasPassedAnEntry() {
643      return passedEntry;
644    }
645  }
646
647  public static class EverythingPassesWALEntryFilterSubclass
648    extends EverythingPassesWALEntryFilter {
649  }
650}