001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.containsString; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.hamcrest.Matchers.startsWith; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertFalse; 027import static org.junit.Assert.assertNotNull; 028import static org.junit.Assert.assertNull; 029import static org.junit.Assert.assertThrows; 030import static org.junit.Assert.assertTrue; 031import static org.junit.Assert.fail; 032 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.HashMap; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.concurrent.CompletionException; 041import java.util.concurrent.ExecutionException; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.replication.ReplicationException; 048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 049import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 050import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 051import org.apache.hadoop.hbase.replication.ReplicationQueueData; 052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 054import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint; 055import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 056import org.apache.hadoop.hbase.testclassification.ClientTests; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.junit.After; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.runner.RunWith; 064import org.junit.runners.Parameterized; 065 066/** 067 * Class to test asynchronous replication admin operations. 068 */ 069@RunWith(Parameterized.class) 070@Category({ LargeTests.class, ClientTests.class }) 071public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); 076 077 private final String ID_ONE = "1"; 078 private static String KEY_ONE; 079 private final String ID_TWO = "2"; 080 private static String KEY_TWO; 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 085 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 086 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 087 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 088 TEST_UTIL.startMiniCluster(); 089 KEY_ONE = TEST_UTIL.getZkConnectionURI() + "-test1"; 090 KEY_TWO = TEST_UTIL.getZkConnectionURI() + "-test2"; 091 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 092 } 093 094 @After 095 public void clearPeerAndQueues() throws IOException, ReplicationException { 096 try { 097 admin.removeReplicationPeer(ID_ONE).join(); 098 } catch (Exception e) { 099 } 100 try { 101 admin.removeReplicationPeer(ID_TWO).join(); 102 } catch (Exception e) { 103 } 104 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 105 .getReplicationQueueStorage(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration()); 106 for (ReplicationQueueData queueData : queueStorage.listAllQueues()) { 107 queueStorage.removeQueue(queueData.getId()); 108 } 109 admin.replicationPeerModificationSwitch(true).join(); 110 } 111 112 @Test 113 public void testAddRemovePeer() throws Exception { 114 ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); 115 ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(); 116 // Add a valid peer 117 admin.addReplicationPeer(ID_ONE, rpc1).join(); 118 // try adding the same (fails) 119 try { 120 admin.addReplicationPeer(ID_ONE, rpc1).join(); 121 fail("Test case should fail as adding a same peer."); 122 } catch (CompletionException e) { 123 // OK! 124 } 125 assertEquals(1, admin.listReplicationPeers().get().size()); 126 // Try to remove an inexisting peer 127 try { 128 admin.removeReplicationPeer(ID_TWO).join(); 129 fail("Test case should fail as removing a inexisting peer."); 130 } catch (CompletionException e) { 131 // OK! 132 } 133 assertEquals(1, admin.listReplicationPeers().get().size()); 134 // Add a second since multi-slave is supported 135 admin.addReplicationPeer(ID_TWO, rpc2).join(); 136 assertEquals(2, admin.listReplicationPeers().get().size()); 137 // Remove the first peer we added 138 admin.removeReplicationPeer(ID_ONE).join(); 139 assertEquals(1, admin.listReplicationPeers().get().size()); 140 admin.removeReplicationPeer(ID_TWO).join(); 141 assertEquals(0, admin.listReplicationPeers().get().size()); 142 } 143 144 @Test 145 public void testPeerConfig() throws Exception { 146 ReplicationPeerConfig config = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE) 147 .putConfiguration("key1", "value1").putConfiguration("key2", "value2").build(); 148 admin.addReplicationPeer(ID_ONE, config).join(); 149 150 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 151 assertEquals(1, peers.size()); 152 ReplicationPeerDescription peerOne = peers.get(0); 153 assertNotNull(peerOne); 154 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 155 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 156 157 admin.removeReplicationPeer(ID_ONE).join(); 158 } 159 160 @Test 161 public void testEnableDisablePeer() throws Exception { 162 ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); 163 admin.addReplicationPeer(ID_ONE, rpc1).join(); 164 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 165 assertEquals(1, peers.size()); 166 assertTrue(peers.get(0).isEnabled()); 167 168 admin.disableReplicationPeer(ID_ONE).join(); 169 peers = admin.listReplicationPeers().get(); 170 assertEquals(1, peers.size()); 171 assertFalse(peers.get(0).isEnabled()); 172 admin.removeReplicationPeer(ID_ONE).join(); 173 } 174 175 @Test 176 public void testAppendPeerTableCFs() throws Exception { 177 ReplicationPeerConfigBuilder rpcBuilder = 178 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 179 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 180 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 181 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 182 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 183 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5"); 184 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6"); 185 186 // Add a valid peer 187 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 188 rpcBuilder.setReplicateAllUserTables(false); 189 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 190 191 Map<TableName, List<String>> tableCFs = new HashMap<>(); 192 193 // append table t1 to replication 194 tableCFs.put(tableName1, null); 195 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 196 Map<TableName, List<String>> result = 197 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 198 assertEquals(1, result.size()); 199 assertEquals(true, result.containsKey(tableName1)); 200 assertNull(result.get(tableName1)); 201 202 // append table t2 to replication 203 tableCFs.clear(); 204 tableCFs.put(tableName2, null); 205 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 206 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 207 assertEquals(2, result.size()); 208 assertTrue("Should contain t1", result.containsKey(tableName1)); 209 assertTrue("Should contain t2", result.containsKey(tableName2)); 210 assertNull(result.get(tableName1)); 211 assertNull(result.get(tableName2)); 212 213 // append table column family: f1 of t3 to replication 214 tableCFs.clear(); 215 tableCFs.put(tableName3, new ArrayList<>()); 216 tableCFs.get(tableName3).add("f1"); 217 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 218 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 219 assertEquals(3, result.size()); 220 assertTrue("Should contain t1", result.containsKey(tableName1)); 221 assertTrue("Should contain t2", result.containsKey(tableName2)); 222 assertTrue("Should contain t3", result.containsKey(tableName3)); 223 assertNull(result.get(tableName1)); 224 assertNull(result.get(tableName2)); 225 assertEquals(1, result.get(tableName3).size()); 226 assertEquals("f1", result.get(tableName3).get(0)); 227 228 // append table column family: f1,f2 of t4 to replication 229 tableCFs.clear(); 230 tableCFs.put(tableName4, new ArrayList<>()); 231 tableCFs.get(tableName4).add("f1"); 232 tableCFs.get(tableName4).add("f2"); 233 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 234 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 235 assertEquals(4, result.size()); 236 assertTrue("Should contain t1", result.containsKey(tableName1)); 237 assertTrue("Should contain t2", result.containsKey(tableName2)); 238 assertTrue("Should contain t3", result.containsKey(tableName3)); 239 assertTrue("Should contain t4", result.containsKey(tableName4)); 240 assertNull(result.get(tableName1)); 241 assertNull(result.get(tableName2)); 242 assertEquals(1, result.get(tableName3).size()); 243 assertEquals("f1", result.get(tableName3).get(0)); 244 assertEquals(2, result.get(tableName4).size()); 245 assertEquals("f1", result.get(tableName4).get(0)); 246 assertEquals("f2", result.get(tableName4).get(1)); 247 248 // append "table5" => [], then append "table5" => ["f1"] 249 tableCFs.clear(); 250 tableCFs.put(tableName5, new ArrayList<>()); 251 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 252 tableCFs.clear(); 253 tableCFs.put(tableName5, new ArrayList<>()); 254 tableCFs.get(tableName5).add("f1"); 255 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 256 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 257 assertEquals(5, result.size()); 258 assertTrue("Should contain t5", result.containsKey(tableName5)); 259 // null means replication all cfs of tab5 260 assertNull(result.get(tableName5)); 261 262 // append "table6" => ["f1"], then append "table6" => [] 263 tableCFs.clear(); 264 tableCFs.put(tableName6, new ArrayList<>()); 265 tableCFs.get(tableName6).add("f1"); 266 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 267 tableCFs.clear(); 268 tableCFs.put(tableName6, new ArrayList<>()); 269 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 270 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 271 assertEquals(6, result.size()); 272 assertTrue("Should contain t6", result.containsKey(tableName6)); 273 // null means replication all cfs of tab6 274 assertNull(result.get(tableName6)); 275 276 admin.removeReplicationPeer(ID_ONE).join(); 277 } 278 279 @Test 280 public void testRemovePeerTableCFs() throws Exception { 281 ReplicationPeerConfigBuilder rpcBuilder = 282 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 283 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 284 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 285 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 286 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 287 // Add a valid peer 288 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 289 rpcBuilder.setReplicateAllUserTables(false); 290 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 291 292 Map<TableName, List<String>> tableCFs = new HashMap<>(); 293 try { 294 tableCFs.put(tableName3, null); 295 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 296 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); 297 } catch (CompletionException e) { 298 assertTrue(e.getCause() instanceof ReplicationException); 299 } 300 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 301 302 tableCFs.clear(); 303 tableCFs.put(tableName1, null); 304 tableCFs.put(tableName2, new ArrayList<>()); 305 tableCFs.get(tableName2).add("cf1"); 306 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 307 try { 308 tableCFs.clear(); 309 tableCFs.put(tableName3, null); 310 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 311 fail("Test case should fail as removing table-cfs from a peer whose" 312 + " table-cfs didn't contain t3"); 313 } catch (CompletionException e) { 314 assertTrue(e.getCause() instanceof ReplicationException); 315 } 316 Map<TableName, List<String>> result = 317 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 318 assertEquals(2, result.size()); 319 assertTrue("Should contain t1", result.containsKey(tableName1)); 320 assertTrue("Should contain t2", result.containsKey(tableName2)); 321 assertNull(result.get(tableName1)); 322 assertEquals(1, result.get(tableName2).size()); 323 assertEquals("cf1", result.get(tableName2).get(0)); 324 325 try { 326 tableCFs.clear(); 327 tableCFs.put(tableName1, new ArrayList<>()); 328 tableCFs.get(tableName1).add("cf1"); 329 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 330 fail("Test case should fail, because table t1 didn't specify cfs in peer config"); 331 } catch (CompletionException e) { 332 assertTrue(e.getCause() instanceof ReplicationException); 333 } 334 tableCFs.clear(); 335 tableCFs.put(tableName1, null); 336 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 337 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 338 assertEquals(1, result.size()); 339 assertEquals(1, result.get(tableName2).size()); 340 assertEquals("cf1", result.get(tableName2).get(0)); 341 342 try { 343 tableCFs.clear(); 344 tableCFs.put(tableName2, null); 345 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 346 fail("Test case should fail, because table t2 hase specified cfs in peer config"); 347 } catch (CompletionException e) { 348 assertTrue(e.getCause() instanceof ReplicationException); 349 } 350 tableCFs.clear(); 351 tableCFs.put(tableName2, new ArrayList<>()); 352 tableCFs.get(tableName2).add("cf1"); 353 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 354 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 355 356 tableCFs.clear(); 357 tableCFs.put(tableName4, new ArrayList<>()); 358 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 359 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 360 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 361 362 admin.removeReplicationPeer(ID_ONE); 363 } 364 365 @Test 366 public void testSetPeerNamespaces() throws Exception { 367 String ns1 = "ns1"; 368 String ns2 = "ns2"; 369 370 ReplicationPeerConfigBuilder rpcBuilder = 371 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 372 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 373 rpcBuilder.setReplicateAllUserTables(false); 374 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 375 376 // add ns1 and ns2 to peer config 377 Set<String> namespaces = new HashSet<>(); 378 namespaces.add(ns1); 379 namespaces.add(ns2); 380 rpcBuilder.setNamespaces(namespaces); 381 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 382 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 383 assertEquals(2, namespaces.size()); 384 assertTrue(namespaces.contains(ns1)); 385 assertTrue(namespaces.contains(ns2)); 386 387 // update peer config only contains ns1 388 namespaces = new HashSet<>(); 389 namespaces.add(ns1); 390 rpcBuilder.setNamespaces(namespaces); 391 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 392 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 393 assertEquals(1, namespaces.size()); 394 assertTrue(namespaces.contains(ns1)); 395 396 admin.removeReplicationPeer(ID_ONE).join(); 397 } 398 399 @Test 400 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 401 String ns1 = "ns1"; 402 String ns2 = "ns2"; 403 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1"); 404 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2"); 405 406 ReplicationPeerConfigBuilder rpcBuilder = 407 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 408 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 409 rpcBuilder.setReplicateAllUserTables(false); 410 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 411 412 Set<String> namespaces = new HashSet<String>(); 413 namespaces.add(ns1); 414 rpcBuilder.setNamespaces(namespaces); 415 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get(); 416 Map<TableName, List<String>> tableCfs = new HashMap<>(); 417 tableCfs.put(tableName1, new ArrayList<>()); 418 rpcBuilder.setTableCFsMap(tableCfs); 419 try { 420 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 421 fail( 422 "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); 423 } catch (CompletionException e) { 424 // OK 425 } 426 427 tableCfs.clear(); 428 tableCfs.put(tableName2, new ArrayList<>()); 429 rpcBuilder.setTableCFsMap(tableCfs); 430 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get(); 431 namespaces.clear(); 432 namespaces.add(ns2); 433 rpcBuilder.setNamespaces(namespaces); 434 try { 435 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 436 fail( 437 "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); 438 } catch (CompletionException e) { 439 // OK 440 } 441 442 admin.removeReplicationPeer(ID_ONE).join(); 443 } 444 445 @Test 446 public void testPeerBandwidth() throws Exception { 447 ReplicationPeerConfigBuilder rpcBuilder = 448 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 449 450 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 451 ; 452 assertEquals(0, admin.getReplicationPeerConfig(ID_ONE).get().getBandwidth()); 453 454 rpcBuilder.setBandwidth(2097152); 455 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 456 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); 457 458 admin.removeReplicationPeer(ID_ONE).join(); 459 } 460 461 @Test 462 public void testInvalidClusterKey() throws InterruptedException { 463 try { 464 admin.addReplicationPeer(ID_ONE, 465 ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); 466 fail(); 467 } catch (ExecutionException e) { 468 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 469 } 470 } 471 472 @Test 473 public void testClusterKeyWithTrailingSpace() throws Exception { 474 admin.addReplicationPeer(ID_ONE, 475 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get(); 476 String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey(); 477 assertEquals(KEY_ONE, clusterKey); 478 } 479 480 @Test 481 public void testInvalidReplicationEndpoint() throws InterruptedException { 482 try { 483 admin.addReplicationPeer(ID_ONE, 484 ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); 485 fail(); 486 } catch (ExecutionException e) { 487 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 488 assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); 489 } 490 } 491 492 @Test 493 public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { 494 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint 495 admin 496 .addReplicationPeer(ID_ONE, 497 ReplicationPeerConfig.newBuilder() 498 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build()) 499 .get(); 500 501 // but we still need to check cluster key if we specify the default ReplicationEndpoint 502 try { 503 admin 504 .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() 505 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) 506 .get(); 507 fail(); 508 } catch (ExecutionException e) { 509 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 510 } 511 } 512 513 /** 514 * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist. 515 */ 516 @Test 517 public void testReplicationPeerNotFoundException() throws InterruptedException { 518 String dummyPeer = "dummy_peer"; 519 try { 520 admin.removeReplicationPeer(dummyPeer).get(); 521 fail(); 522 } catch (ExecutionException e) { 523 assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class)); 524 } 525 } 526 527 @Test 528 public void testReplicationPeerModificationSwitch() throws Exception { 529 assertTrue(admin.isReplicationPeerModificationEnabled().get()); 530 // disable modification, should returns true as it is enabled by default and the above 531 // assertion has confirmed it 532 assertTrue(admin.replicationPeerModificationSwitch(false).get()); 533 ExecutionException error = assertThrows(ExecutionException.class, () -> admin 534 .addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()) 535 .get()); 536 assertThat(error.getCause().getMessage(), 537 containsString("Replication peer modification disabled")); 538 // enable again, and the previous value should be false 539 assertFalse(admin.replicationPeerModificationSwitch(true).get()); 540 } 541}