001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.containsString;
023import static org.hamcrest.Matchers.instanceOf;
024import static org.hamcrest.Matchers.startsWith;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertThrows;
030import static org.junit.Assert.assertTrue;
031import static org.junit.Assert.fail;
032
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.HashMap;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.concurrent.CompletionException;
041import java.util.concurrent.ExecutionException;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.replication.ReplicationException;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
049import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
050import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
051import org.apache.hadoop.hbase.replication.ReplicationQueueData;
052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
054import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
056import org.apache.hadoop.hbase.testclassification.ClientTests;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.junit.After;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.runner.RunWith;
064import org.junit.runners.Parameterized;
065
066/**
067 * Class to test asynchronous replication admin operations.
068 */
069@RunWith(Parameterized.class)
070@Category({ LargeTests.class, ClientTests.class })
071public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
076
077  private final String ID_ONE = "1";
078  private static String KEY_ONE;
079  private final String ID_TWO = "2";
080  private static String KEY_TWO;
081
082  @BeforeClass
083  public static void setUpBeforeClass() throws Exception {
084    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
085    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
086    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
087    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
088    TEST_UTIL.startMiniCluster();
089    KEY_ONE = TEST_UTIL.getZkConnectionURI() + "-test1";
090    KEY_TWO = TEST_UTIL.getZkConnectionURI() + "-test2";
091    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
092  }
093
094  @After
095  public void clearPeerAndQueues() throws IOException, ReplicationException {
096    try {
097      admin.removeReplicationPeer(ID_ONE).join();
098    } catch (Exception e) {
099    }
100    try {
101      admin.removeReplicationPeer(ID_TWO).join();
102    } catch (Exception e) {
103    }
104    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
105      .getReplicationQueueStorage(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration());
106    for (ReplicationQueueData queueData : queueStorage.listAllQueues()) {
107      queueStorage.removeQueue(queueData.getId());
108    }
109    admin.replicationPeerModificationSwitch(true).join();
110  }
111
112  @Test
113  public void testAddRemovePeer() throws Exception {
114    ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build();
115    ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build();
116    // Add a valid peer
117    admin.addReplicationPeer(ID_ONE, rpc1).join();
118    // try adding the same (fails)
119    try {
120      admin.addReplicationPeer(ID_ONE, rpc1).join();
121      fail("Test case should fail as adding a same peer.");
122    } catch (CompletionException e) {
123      // OK!
124    }
125    assertEquals(1, admin.listReplicationPeers().get().size());
126    // Try to remove an inexisting peer
127    try {
128      admin.removeReplicationPeer(ID_TWO).join();
129      fail("Test case should fail as removing a inexisting peer.");
130    } catch (CompletionException e) {
131      // OK!
132    }
133    assertEquals(1, admin.listReplicationPeers().get().size());
134    // Add a second since multi-slave is supported
135    admin.addReplicationPeer(ID_TWO, rpc2).join();
136    assertEquals(2, admin.listReplicationPeers().get().size());
137    // Remove the first peer we added
138    admin.removeReplicationPeer(ID_ONE).join();
139    assertEquals(1, admin.listReplicationPeers().get().size());
140    admin.removeReplicationPeer(ID_TWO).join();
141    assertEquals(0, admin.listReplicationPeers().get().size());
142  }
143
144  @Test
145  public void testPeerConfig() throws Exception {
146    ReplicationPeerConfig config = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE)
147      .putConfiguration("key1", "value1").putConfiguration("key2", "value2").build();
148    admin.addReplicationPeer(ID_ONE, config).join();
149
150    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
151    assertEquals(1, peers.size());
152    ReplicationPeerDescription peerOne = peers.get(0);
153    assertNotNull(peerOne);
154    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
155    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
156
157    admin.removeReplicationPeer(ID_ONE).join();
158  }
159
160  @Test
161  public void testEnableDisablePeer() throws Exception {
162    ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build();
163    admin.addReplicationPeer(ID_ONE, rpc1).join();
164    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
165    assertEquals(1, peers.size());
166    assertTrue(peers.get(0).isEnabled());
167
168    admin.disableReplicationPeer(ID_ONE).join();
169    peers = admin.listReplicationPeers().get();
170    assertEquals(1, peers.size());
171    assertFalse(peers.get(0).isEnabled());
172    admin.removeReplicationPeer(ID_ONE).join();
173  }
174
175  @Test
176  public void testAppendPeerTableCFs() throws Exception {
177    ReplicationPeerConfigBuilder rpcBuilder =
178      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
179    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
180    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
181    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
182    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
183    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
184    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
185
186    // Add a valid peer
187    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
188    rpcBuilder.setReplicateAllUserTables(false);
189    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
190
191    Map<TableName, List<String>> tableCFs = new HashMap<>();
192
193    // append table t1 to replication
194    tableCFs.put(tableName1, null);
195    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
196    Map<TableName, List<String>> result =
197      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
198    assertEquals(1, result.size());
199    assertEquals(true, result.containsKey(tableName1));
200    assertNull(result.get(tableName1));
201
202    // append table t2 to replication
203    tableCFs.clear();
204    tableCFs.put(tableName2, null);
205    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
206    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
207    assertEquals(2, result.size());
208    assertTrue("Should contain t1", result.containsKey(tableName1));
209    assertTrue("Should contain t2", result.containsKey(tableName2));
210    assertNull(result.get(tableName1));
211    assertNull(result.get(tableName2));
212
213    // append table column family: f1 of t3 to replication
214    tableCFs.clear();
215    tableCFs.put(tableName3, new ArrayList<>());
216    tableCFs.get(tableName3).add("f1");
217    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
218    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
219    assertEquals(3, result.size());
220    assertTrue("Should contain t1", result.containsKey(tableName1));
221    assertTrue("Should contain t2", result.containsKey(tableName2));
222    assertTrue("Should contain t3", result.containsKey(tableName3));
223    assertNull(result.get(tableName1));
224    assertNull(result.get(tableName2));
225    assertEquals(1, result.get(tableName3).size());
226    assertEquals("f1", result.get(tableName3).get(0));
227
228    // append table column family: f1,f2 of t4 to replication
229    tableCFs.clear();
230    tableCFs.put(tableName4, new ArrayList<>());
231    tableCFs.get(tableName4).add("f1");
232    tableCFs.get(tableName4).add("f2");
233    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
234    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
235    assertEquals(4, result.size());
236    assertTrue("Should contain t1", result.containsKey(tableName1));
237    assertTrue("Should contain t2", result.containsKey(tableName2));
238    assertTrue("Should contain t3", result.containsKey(tableName3));
239    assertTrue("Should contain t4", result.containsKey(tableName4));
240    assertNull(result.get(tableName1));
241    assertNull(result.get(tableName2));
242    assertEquals(1, result.get(tableName3).size());
243    assertEquals("f1", result.get(tableName3).get(0));
244    assertEquals(2, result.get(tableName4).size());
245    assertEquals("f1", result.get(tableName4).get(0));
246    assertEquals("f2", result.get(tableName4).get(1));
247
248    // append "table5" => [], then append "table5" => ["f1"]
249    tableCFs.clear();
250    tableCFs.put(tableName5, new ArrayList<>());
251    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
252    tableCFs.clear();
253    tableCFs.put(tableName5, new ArrayList<>());
254    tableCFs.get(tableName5).add("f1");
255    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
256    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
257    assertEquals(5, result.size());
258    assertTrue("Should contain t5", result.containsKey(tableName5));
259    // null means replication all cfs of tab5
260    assertNull(result.get(tableName5));
261
262    // append "table6" => ["f1"], then append "table6" => []
263    tableCFs.clear();
264    tableCFs.put(tableName6, new ArrayList<>());
265    tableCFs.get(tableName6).add("f1");
266    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
267    tableCFs.clear();
268    tableCFs.put(tableName6, new ArrayList<>());
269    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
270    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
271    assertEquals(6, result.size());
272    assertTrue("Should contain t6", result.containsKey(tableName6));
273    // null means replication all cfs of tab6
274    assertNull(result.get(tableName6));
275
276    admin.removeReplicationPeer(ID_ONE).join();
277  }
278
279  @Test
280  public void testRemovePeerTableCFs() throws Exception {
281    ReplicationPeerConfigBuilder rpcBuilder =
282      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
283    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
284    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
285    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
286    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
287    // Add a valid peer
288    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
289    rpcBuilder.setReplicateAllUserTables(false);
290    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
291
292    Map<TableName, List<String>> tableCFs = new HashMap<>();
293    try {
294      tableCFs.put(tableName3, null);
295      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
296      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
297    } catch (CompletionException e) {
298      assertTrue(e.getCause() instanceof ReplicationException);
299    }
300    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
301
302    tableCFs.clear();
303    tableCFs.put(tableName1, null);
304    tableCFs.put(tableName2, new ArrayList<>());
305    tableCFs.get(tableName2).add("cf1");
306    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
307    try {
308      tableCFs.clear();
309      tableCFs.put(tableName3, null);
310      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
311      fail("Test case should fail as removing table-cfs from a peer whose"
312        + " table-cfs didn't contain t3");
313    } catch (CompletionException e) {
314      assertTrue(e.getCause() instanceof ReplicationException);
315    }
316    Map<TableName, List<String>> result =
317      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
318    assertEquals(2, result.size());
319    assertTrue("Should contain t1", result.containsKey(tableName1));
320    assertTrue("Should contain t2", result.containsKey(tableName2));
321    assertNull(result.get(tableName1));
322    assertEquals(1, result.get(tableName2).size());
323    assertEquals("cf1", result.get(tableName2).get(0));
324
325    try {
326      tableCFs.clear();
327      tableCFs.put(tableName1, new ArrayList<>());
328      tableCFs.get(tableName1).add("cf1");
329      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
330      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
331    } catch (CompletionException e) {
332      assertTrue(e.getCause() instanceof ReplicationException);
333    }
334    tableCFs.clear();
335    tableCFs.put(tableName1, null);
336    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
337    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
338    assertEquals(1, result.size());
339    assertEquals(1, result.get(tableName2).size());
340    assertEquals("cf1", result.get(tableName2).get(0));
341
342    try {
343      tableCFs.clear();
344      tableCFs.put(tableName2, null);
345      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
346      fail("Test case should fail, because table t2 hase specified cfs in peer config");
347    } catch (CompletionException e) {
348      assertTrue(e.getCause() instanceof ReplicationException);
349    }
350    tableCFs.clear();
351    tableCFs.put(tableName2, new ArrayList<>());
352    tableCFs.get(tableName2).add("cf1");
353    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
354    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
355
356    tableCFs.clear();
357    tableCFs.put(tableName4, new ArrayList<>());
358    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
359    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
360    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
361
362    admin.removeReplicationPeer(ID_ONE);
363  }
364
365  @Test
366  public void testSetPeerNamespaces() throws Exception {
367    String ns1 = "ns1";
368    String ns2 = "ns2";
369
370    ReplicationPeerConfigBuilder rpcBuilder =
371      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
372    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
373    rpcBuilder.setReplicateAllUserTables(false);
374    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
375
376    // add ns1 and ns2 to peer config
377    Set<String> namespaces = new HashSet<>();
378    namespaces.add(ns1);
379    namespaces.add(ns2);
380    rpcBuilder.setNamespaces(namespaces);
381    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
382    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
383    assertEquals(2, namespaces.size());
384    assertTrue(namespaces.contains(ns1));
385    assertTrue(namespaces.contains(ns2));
386
387    // update peer config only contains ns1
388    namespaces = new HashSet<>();
389    namespaces.add(ns1);
390    rpcBuilder.setNamespaces(namespaces);
391    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
392    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
393    assertEquals(1, namespaces.size());
394    assertTrue(namespaces.contains(ns1));
395
396    admin.removeReplicationPeer(ID_ONE).join();
397  }
398
399  @Test
400  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
401    String ns1 = "ns1";
402    String ns2 = "ns2";
403    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
404    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
405
406    ReplicationPeerConfigBuilder rpcBuilder =
407      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
408    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
409    rpcBuilder.setReplicateAllUserTables(false);
410    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
411
412    Set<String> namespaces = new HashSet<String>();
413    namespaces.add(ns1);
414    rpcBuilder.setNamespaces(namespaces);
415    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get();
416    Map<TableName, List<String>> tableCfs = new HashMap<>();
417    tableCfs.put(tableName1, new ArrayList<>());
418    rpcBuilder.setTableCFsMap(tableCfs);
419    try {
420      admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
421      fail(
422        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
423    } catch (CompletionException e) {
424      // OK
425    }
426
427    tableCfs.clear();
428    tableCfs.put(tableName2, new ArrayList<>());
429    rpcBuilder.setTableCFsMap(tableCfs);
430    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get();
431    namespaces.clear();
432    namespaces.add(ns2);
433    rpcBuilder.setNamespaces(namespaces);
434    try {
435      admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
436      fail(
437        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
438    } catch (CompletionException e) {
439      // OK
440    }
441
442    admin.removeReplicationPeer(ID_ONE).join();
443  }
444
445  @Test
446  public void testPeerBandwidth() throws Exception {
447    ReplicationPeerConfigBuilder rpcBuilder =
448      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
449
450    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
451    ;
452    assertEquals(0, admin.getReplicationPeerConfig(ID_ONE).get().getBandwidth());
453
454    rpcBuilder.setBandwidth(2097152);
455    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
456    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
457
458    admin.removeReplicationPeer(ID_ONE).join();
459  }
460
461  @Test
462  public void testInvalidClusterKey() throws InterruptedException {
463    try {
464      admin.addReplicationPeer(ID_ONE,
465        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
466      fail();
467    } catch (ExecutionException e) {
468      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
469    }
470  }
471
472  @Test
473  public void testClusterKeyWithTrailingSpace() throws Exception {
474    admin.addReplicationPeer(ID_ONE,
475      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get();
476    String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey();
477    assertEquals(KEY_ONE, clusterKey);
478  }
479
480  @Test
481  public void testInvalidReplicationEndpoint() throws InterruptedException {
482    try {
483      admin.addReplicationPeer(ID_ONE,
484        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
485      fail();
486    } catch (ExecutionException e) {
487      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
488      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
489    }
490  }
491
492  @Test
493  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
494    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
495    admin
496      .addReplicationPeer(ID_ONE,
497        ReplicationPeerConfig.newBuilder()
498          .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
499      .get();
500
501    // but we still need to check cluster key if we specify the default ReplicationEndpoint
502    try {
503      admin
504        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
505          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
506        .get();
507      fail();
508    } catch (ExecutionException e) {
509      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
510    }
511  }
512
513  /**
514   * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist.
515   */
516  @Test
517  public void testReplicationPeerNotFoundException() throws InterruptedException {
518    String dummyPeer = "dummy_peer";
519    try {
520      admin.removeReplicationPeer(dummyPeer).get();
521      fail();
522    } catch (ExecutionException e) {
523      assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class));
524    }
525  }
526
527  @Test
528  public void testReplicationPeerModificationSwitch() throws Exception {
529    assertTrue(admin.isReplicationPeerModificationEnabled().get());
530    // disable modification, should returns true as it is enabled by default and the above
531    // assertion has confirmed it
532    assertTrue(admin.replicationPeerModificationSwitch(false).get());
533    ExecutionException error = assertThrows(ExecutionException.class, () -> admin
534      .addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build())
535      .get());
536    assertThat(error.getCause().getMessage(),
537      containsString("Replication peer modification disabled"));
538    // enable again, and the previous value should be false
539    assertFalse(admin.replicationPeerModificationSwitch(true).get());
540  }
541}