001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.containsString;
023import static org.hamcrest.Matchers.instanceOf;
024import static org.hamcrest.Matchers.startsWith;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertThrows;
030import static org.junit.Assert.assertTrue;
031import static org.junit.Assert.fail;
032
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.HashMap;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.concurrent.CompletionException;
041import java.util.concurrent.ExecutionException;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.replication.ReplicationException;
049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
050import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
053import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
054import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.junit.After;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.runner.RunWith;
063import org.junit.runners.Parameterized;
064
065/**
066 * Class to test asynchronous replication admin operations.
067 */
068@RunWith(Parameterized.class)
069@Category({ LargeTests.class, ClientTests.class })
070public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
075
076  private final String ID_ONE = "1";
077  private static String KEY_ONE;
078  private final String ID_TWO = "2";
079  private static String KEY_TWO;
080
081  @BeforeClass
082  public static void setUpBeforeClass() throws Exception {
083    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
084    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
085    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
086    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
087    TEST_UTIL.startMiniCluster();
088    KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
089    KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
090    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
091  }
092
093  @After
094  public void clearPeerAndQueues() throws IOException, ReplicationException {
095    try {
096      admin.removeReplicationPeer(ID_ONE).join();
097    } catch (Exception e) {
098    }
099    try {
100      admin.removeReplicationPeer(ID_TWO).join();
101    } catch (Exception e) {
102    }
103    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
104      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
105    for (ServerName serverName : queueStorage.getListOfReplicators()) {
106      for (String queue : queueStorage.getAllQueues(serverName)) {
107        queueStorage.removeQueue(serverName, queue);
108      }
109    }
110    admin.replicationPeerModificationSwitch(true).join();
111  }
112
113  @Test
114  public void testAddRemovePeer() throws Exception {
115    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
116    rpc1.setClusterKey(KEY_ONE);
117    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
118    rpc2.setClusterKey(KEY_TWO);
119    // Add a valid peer
120    admin.addReplicationPeer(ID_ONE, rpc1).join();
121    // try adding the same (fails)
122    try {
123      admin.addReplicationPeer(ID_ONE, rpc1).join();
124      fail("Test case should fail as adding a same peer.");
125    } catch (CompletionException e) {
126      // OK!
127    }
128    assertEquals(1, admin.listReplicationPeers().get().size());
129    // Try to remove an inexisting peer
130    try {
131      admin.removeReplicationPeer(ID_TWO).join();
132      fail("Test case should fail as removing a inexisting peer.");
133    } catch (CompletionException e) {
134      // OK!
135    }
136    assertEquals(1, admin.listReplicationPeers().get().size());
137    // Add a second since multi-slave is supported
138    admin.addReplicationPeer(ID_TWO, rpc2).join();
139    assertEquals(2, admin.listReplicationPeers().get().size());
140    // Remove the first peer we added
141    admin.removeReplicationPeer(ID_ONE).join();
142    assertEquals(1, admin.listReplicationPeers().get().size());
143    admin.removeReplicationPeer(ID_TWO).join();
144    assertEquals(0, admin.listReplicationPeers().get().size());
145  }
146
147  @Test
148  public void testPeerConfig() throws Exception {
149    ReplicationPeerConfig config = new ReplicationPeerConfig();
150    config.setClusterKey(KEY_ONE);
151    config.getConfiguration().put("key1", "value1");
152    config.getConfiguration().put("key2", "value2");
153    admin.addReplicationPeer(ID_ONE, config).join();
154
155    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
156    assertEquals(1, peers.size());
157    ReplicationPeerDescription peerOne = peers.get(0);
158    assertNotNull(peerOne);
159    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
160    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
161
162    admin.removeReplicationPeer(ID_ONE).join();
163  }
164
165  @Test
166  public void testEnableDisablePeer() throws Exception {
167    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
168    rpc1.setClusterKey(KEY_ONE);
169    admin.addReplicationPeer(ID_ONE, rpc1).join();
170    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
171    assertEquals(1, peers.size());
172    assertTrue(peers.get(0).isEnabled());
173
174    admin.disableReplicationPeer(ID_ONE).join();
175    peers = admin.listReplicationPeers().get();
176    assertEquals(1, peers.size());
177    assertFalse(peers.get(0).isEnabled());
178    admin.removeReplicationPeer(ID_ONE).join();
179  }
180
181  @Test
182  public void testAppendPeerTableCFs() throws Exception {
183    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
184    rpc1.setClusterKey(KEY_ONE);
185    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
186    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
187    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
188    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
189    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
190    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
191
192    // Add a valid peer
193    admin.addReplicationPeer(ID_ONE, rpc1).join();
194    rpc1.setReplicateAllUserTables(false);
195    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
196
197    Map<TableName, List<String>> tableCFs = new HashMap<>();
198
199    // append table t1 to replication
200    tableCFs.put(tableName1, null);
201    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
202    Map<TableName, List<String>> result =
203      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
204    assertEquals(1, result.size());
205    assertEquals(true, result.containsKey(tableName1));
206    assertNull(result.get(tableName1));
207
208    // append table t2 to replication
209    tableCFs.clear();
210    tableCFs.put(tableName2, null);
211    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
212    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
213    assertEquals(2, result.size());
214    assertTrue("Should contain t1", result.containsKey(tableName1));
215    assertTrue("Should contain t2", result.containsKey(tableName2));
216    assertNull(result.get(tableName1));
217    assertNull(result.get(tableName2));
218
219    // append table column family: f1 of t3 to replication
220    tableCFs.clear();
221    tableCFs.put(tableName3, new ArrayList<>());
222    tableCFs.get(tableName3).add("f1");
223    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
224    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
225    assertEquals(3, result.size());
226    assertTrue("Should contain t1", result.containsKey(tableName1));
227    assertTrue("Should contain t2", result.containsKey(tableName2));
228    assertTrue("Should contain t3", result.containsKey(tableName3));
229    assertNull(result.get(tableName1));
230    assertNull(result.get(tableName2));
231    assertEquals(1, result.get(tableName3).size());
232    assertEquals("f1", result.get(tableName3).get(0));
233
234    // append table column family: f1,f2 of t4 to replication
235    tableCFs.clear();
236    tableCFs.put(tableName4, new ArrayList<>());
237    tableCFs.get(tableName4).add("f1");
238    tableCFs.get(tableName4).add("f2");
239    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
240    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
241    assertEquals(4, result.size());
242    assertTrue("Should contain t1", result.containsKey(tableName1));
243    assertTrue("Should contain t2", result.containsKey(tableName2));
244    assertTrue("Should contain t3", result.containsKey(tableName3));
245    assertTrue("Should contain t4", result.containsKey(tableName4));
246    assertNull(result.get(tableName1));
247    assertNull(result.get(tableName2));
248    assertEquals(1, result.get(tableName3).size());
249    assertEquals("f1", result.get(tableName3).get(0));
250    assertEquals(2, result.get(tableName4).size());
251    assertEquals("f1", result.get(tableName4).get(0));
252    assertEquals("f2", result.get(tableName4).get(1));
253
254    // append "table5" => [], then append "table5" => ["f1"]
255    tableCFs.clear();
256    tableCFs.put(tableName5, new ArrayList<>());
257    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
258    tableCFs.clear();
259    tableCFs.put(tableName5, new ArrayList<>());
260    tableCFs.get(tableName5).add("f1");
261    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
262    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
263    assertEquals(5, result.size());
264    assertTrue("Should contain t5", result.containsKey(tableName5));
265    // null means replication all cfs of tab5
266    assertNull(result.get(tableName5));
267
268    // append "table6" => ["f1"], then append "table6" => []
269    tableCFs.clear();
270    tableCFs.put(tableName6, new ArrayList<>());
271    tableCFs.get(tableName6).add("f1");
272    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
273    tableCFs.clear();
274    tableCFs.put(tableName6, new ArrayList<>());
275    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
276    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
277    assertEquals(6, result.size());
278    assertTrue("Should contain t6", result.containsKey(tableName6));
279    // null means replication all cfs of tab6
280    assertNull(result.get(tableName6));
281
282    admin.removeReplicationPeer(ID_ONE).join();
283  }
284
285  @Test
286  public void testRemovePeerTableCFs() throws Exception {
287    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
288    rpc1.setClusterKey(KEY_ONE);
289    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
290    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
291    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
292    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
293    // Add a valid peer
294    admin.addReplicationPeer(ID_ONE, rpc1).join();
295    rpc1.setReplicateAllUserTables(false);
296    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
297
298    Map<TableName, List<String>> tableCFs = new HashMap<>();
299    try {
300      tableCFs.put(tableName3, null);
301      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
302      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
303    } catch (CompletionException e) {
304      assertTrue(e.getCause() instanceof ReplicationException);
305    }
306    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
307
308    tableCFs.clear();
309    tableCFs.put(tableName1, null);
310    tableCFs.put(tableName2, new ArrayList<>());
311    tableCFs.get(tableName2).add("cf1");
312    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
313    try {
314      tableCFs.clear();
315      tableCFs.put(tableName3, null);
316      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
317      fail("Test case should fail as removing table-cfs from a peer whose"
318        + " table-cfs didn't contain t3");
319    } catch (CompletionException e) {
320      assertTrue(e.getCause() instanceof ReplicationException);
321    }
322    Map<TableName, List<String>> result =
323      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
324    assertEquals(2, result.size());
325    assertTrue("Should contain t1", result.containsKey(tableName1));
326    assertTrue("Should contain t2", result.containsKey(tableName2));
327    assertNull(result.get(tableName1));
328    assertEquals(1, result.get(tableName2).size());
329    assertEquals("cf1", result.get(tableName2).get(0));
330
331    try {
332      tableCFs.clear();
333      tableCFs.put(tableName1, new ArrayList<>());
334      tableCFs.get(tableName1).add("cf1");
335      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
336      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
337    } catch (CompletionException e) {
338      assertTrue(e.getCause() instanceof ReplicationException);
339    }
340    tableCFs.clear();
341    tableCFs.put(tableName1, null);
342    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
343    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
344    assertEquals(1, result.size());
345    assertEquals(1, result.get(tableName2).size());
346    assertEquals("cf1", result.get(tableName2).get(0));
347
348    try {
349      tableCFs.clear();
350      tableCFs.put(tableName2, null);
351      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
352      fail("Test case should fail, because table t2 hase specified cfs in peer config");
353    } catch (CompletionException e) {
354      assertTrue(e.getCause() instanceof ReplicationException);
355    }
356    tableCFs.clear();
357    tableCFs.put(tableName2, new ArrayList<>());
358    tableCFs.get(tableName2).add("cf1");
359    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
360    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
361
362    tableCFs.clear();
363    tableCFs.put(tableName4, new ArrayList<>());
364    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
365    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
366    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
367
368    admin.removeReplicationPeer(ID_ONE);
369  }
370
371  @Test
372  public void testSetPeerNamespaces() throws Exception {
373    String ns1 = "ns1";
374    String ns2 = "ns2";
375
376    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
377    rpc.setClusterKey(KEY_ONE);
378    admin.addReplicationPeer(ID_ONE, rpc).join();
379    rpc.setReplicateAllUserTables(false);
380    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
381
382    // add ns1 and ns2 to peer config
383    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
384    Set<String> namespaces = new HashSet<>();
385    namespaces.add(ns1);
386    namespaces.add(ns2);
387    rpc.setNamespaces(namespaces);
388    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
389    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
390    assertEquals(2, namespaces.size());
391    assertTrue(namespaces.contains(ns1));
392    assertTrue(namespaces.contains(ns2));
393
394    // update peer config only contains ns1
395    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
396    namespaces = new HashSet<>();
397    namespaces.add(ns1);
398    rpc.setNamespaces(namespaces);
399    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
400    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
401    assertEquals(1, namespaces.size());
402    assertTrue(namespaces.contains(ns1));
403
404    admin.removeReplicationPeer(ID_ONE).join();
405  }
406
407  @Test
408  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
409    String ns1 = "ns1";
410    String ns2 = "ns2";
411    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
412    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
413
414    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
415    rpc.setClusterKey(KEY_ONE);
416    admin.addReplicationPeer(ID_ONE, rpc).join();
417    rpc.setReplicateAllUserTables(false);
418    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
419
420    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
421    Set<String> namespaces = new HashSet<String>();
422    namespaces.add(ns1);
423    rpc.setNamespaces(namespaces);
424    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
425    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
426    Map<TableName, List<String>> tableCfs = new HashMap<>();
427    tableCfs.put(tableName1, new ArrayList<>());
428    rpc.setTableCFsMap(tableCfs);
429    try {
430      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
431      fail(
432        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
433    } catch (CompletionException e) {
434      // OK
435    }
436
437    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
438    tableCfs.clear();
439    tableCfs.put(tableName2, new ArrayList<>());
440    rpc.setTableCFsMap(tableCfs);
441    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
442    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
443    namespaces.clear();
444    namespaces.add(ns2);
445    rpc.setNamespaces(namespaces);
446    try {
447      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
448      fail(
449        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
450    } catch (CompletionException e) {
451      // OK
452    }
453
454    admin.removeReplicationPeer(ID_ONE).join();
455  }
456
457  @Test
458  public void testPeerBandwidth() throws Exception {
459    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
460    rpc.setClusterKey(KEY_ONE);
461
462    admin.addReplicationPeer(ID_ONE, rpc).join();
463    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
464    assertEquals(0, rpc.getBandwidth());
465
466    rpc.setBandwidth(2097152);
467    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
468    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
469
470    admin.removeReplicationPeer(ID_ONE).join();
471  }
472
473  @Test
474  public void testInvalidClusterKey() throws InterruptedException {
475    try {
476      admin.addReplicationPeer(ID_ONE,
477        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
478      fail();
479    } catch (ExecutionException e) {
480      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
481    }
482  }
483
484  @Test
485  public void testClusterKeyWithTrailingSpace() throws Exception {
486    admin.addReplicationPeer(ID_ONE,
487      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get();
488    String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey();
489    assertEquals(KEY_ONE, clusterKey);
490  }
491
492  @Test
493  public void testInvalidReplicationEndpoint() throws InterruptedException {
494    try {
495      admin.addReplicationPeer(ID_ONE,
496        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
497      fail();
498    } catch (ExecutionException e) {
499      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
500      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
501    }
502  }
503
504  @Test
505  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
506    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
507    admin
508      .addReplicationPeer(ID_ONE,
509        ReplicationPeerConfig.newBuilder()
510          .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
511      .get();
512
513    // but we still need to check cluster key if we specify the default ReplicationEndpoint
514    try {
515      admin
516        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
517          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
518        .get();
519      fail();
520    } catch (ExecutionException e) {
521      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
522    }
523  }
524
525  /**
526   * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist.
527   */
528  @Test
529  public void testReplicationPeerNotFoundException() throws InterruptedException {
530    String dummyPeer = "dummy_peer";
531    try {
532      admin.removeReplicationPeer(dummyPeer).get();
533      fail();
534    } catch (ExecutionException e) {
535      assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class));
536    }
537  }
538
539  @Test
540  public void testReplicationPeerModificationSwitch() throws Exception {
541    assertTrue(admin.isReplicationPeerModificationEnabled().get());
542    // disable modification, should returns true as it is enabled by default and the above
543    // assertion has confirmed it
544    assertTrue(admin.replicationPeerModificationSwitch(false).get());
545    ExecutionException error = assertThrows(ExecutionException.class, () -> admin
546      .addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build())
547      .get());
548    assertThat(error.getCause().getMessage(),
549      containsString("Replication peer modification disabled"));
550    // enable again, and the previous value should be false
551    assertFalse(admin.replicationPeerModificationSwitch(true).get());
552  }
553}