001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.containsString; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.hamcrest.Matchers.startsWith; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertFalse; 027import static org.junit.Assert.assertNotNull; 028import static org.junit.Assert.assertNull; 029import static org.junit.Assert.assertThrows; 030import static org.junit.Assert.assertTrue; 031import static org.junit.Assert.fail; 032 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.HashMap; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.concurrent.CompletionException; 041import java.util.concurrent.ExecutionException; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.replication.ReplicationException; 049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 050import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 053import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint; 054import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.junit.After; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.runner.RunWith; 063import org.junit.runners.Parameterized; 064 065/** 066 * Class to test asynchronous replication admin operations. 067 */ 068@RunWith(Parameterized.class) 069@Category({ LargeTests.class, ClientTests.class }) 070public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); 075 076 private final String ID_ONE = "1"; 077 private static String KEY_ONE; 078 private final String ID_TWO = "2"; 079 private static String KEY_TWO; 080 081 @BeforeClass 082 public static void setUpBeforeClass() throws Exception { 083 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 084 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 085 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 086 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 087 TEST_UTIL.startMiniCluster(); 088 KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; 089 KEY_TWO = TEST_UTIL.getClusterKey() + "-test2"; 090 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 091 } 092 093 @After 094 public void clearPeerAndQueues() throws IOException, ReplicationException { 095 try { 096 admin.removeReplicationPeer(ID_ONE).join(); 097 } catch (Exception e) { 098 } 099 try { 100 admin.removeReplicationPeer(ID_TWO).join(); 101 } catch (Exception e) { 102 } 103 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 104 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 105 for (ServerName serverName : queueStorage.getListOfReplicators()) { 106 for (String queue : queueStorage.getAllQueues(serverName)) { 107 queueStorage.removeQueue(serverName, queue); 108 } 109 } 110 admin.replicationPeerModificationSwitch(true).join(); 111 } 112 113 @Test 114 public void testAddRemovePeer() throws Exception { 115 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 116 rpc1.setClusterKey(KEY_ONE); 117 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 118 rpc2.setClusterKey(KEY_TWO); 119 // Add a valid peer 120 admin.addReplicationPeer(ID_ONE, rpc1).join(); 121 // try adding the same (fails) 122 try { 123 admin.addReplicationPeer(ID_ONE, rpc1).join(); 124 fail("Test case should fail as adding a same peer."); 125 } catch (CompletionException e) { 126 // OK! 127 } 128 assertEquals(1, admin.listReplicationPeers().get().size()); 129 // Try to remove an inexisting peer 130 try { 131 admin.removeReplicationPeer(ID_TWO).join(); 132 fail("Test case should fail as removing a inexisting peer."); 133 } catch (CompletionException e) { 134 // OK! 135 } 136 assertEquals(1, admin.listReplicationPeers().get().size()); 137 // Add a second since multi-slave is supported 138 admin.addReplicationPeer(ID_TWO, rpc2).join(); 139 assertEquals(2, admin.listReplicationPeers().get().size()); 140 // Remove the first peer we added 141 admin.removeReplicationPeer(ID_ONE).join(); 142 assertEquals(1, admin.listReplicationPeers().get().size()); 143 admin.removeReplicationPeer(ID_TWO).join(); 144 assertEquals(0, admin.listReplicationPeers().get().size()); 145 } 146 147 @Test 148 public void testPeerConfig() throws Exception { 149 ReplicationPeerConfig config = new ReplicationPeerConfig(); 150 config.setClusterKey(KEY_ONE); 151 config.getConfiguration().put("key1", "value1"); 152 config.getConfiguration().put("key2", "value2"); 153 admin.addReplicationPeer(ID_ONE, config).join(); 154 155 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 156 assertEquals(1, peers.size()); 157 ReplicationPeerDescription peerOne = peers.get(0); 158 assertNotNull(peerOne); 159 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 160 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 161 162 admin.removeReplicationPeer(ID_ONE).join(); 163 } 164 165 @Test 166 public void testEnableDisablePeer() throws Exception { 167 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 168 rpc1.setClusterKey(KEY_ONE); 169 admin.addReplicationPeer(ID_ONE, rpc1).join(); 170 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 171 assertEquals(1, peers.size()); 172 assertTrue(peers.get(0).isEnabled()); 173 174 admin.disableReplicationPeer(ID_ONE).join(); 175 peers = admin.listReplicationPeers().get(); 176 assertEquals(1, peers.size()); 177 assertFalse(peers.get(0).isEnabled()); 178 admin.removeReplicationPeer(ID_ONE).join(); 179 } 180 181 @Test 182 public void testAppendPeerTableCFs() throws Exception { 183 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 184 rpc1.setClusterKey(KEY_ONE); 185 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 186 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 187 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 188 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 189 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5"); 190 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6"); 191 192 // Add a valid peer 193 admin.addReplicationPeer(ID_ONE, rpc1).join(); 194 rpc1.setReplicateAllUserTables(false); 195 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 196 197 Map<TableName, List<String>> tableCFs = new HashMap<>(); 198 199 // append table t1 to replication 200 tableCFs.put(tableName1, null); 201 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 202 Map<TableName, List<String>> result = 203 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 204 assertEquals(1, result.size()); 205 assertEquals(true, result.containsKey(tableName1)); 206 assertNull(result.get(tableName1)); 207 208 // append table t2 to replication 209 tableCFs.clear(); 210 tableCFs.put(tableName2, null); 211 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 212 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 213 assertEquals(2, result.size()); 214 assertTrue("Should contain t1", result.containsKey(tableName1)); 215 assertTrue("Should contain t2", result.containsKey(tableName2)); 216 assertNull(result.get(tableName1)); 217 assertNull(result.get(tableName2)); 218 219 // append table column family: f1 of t3 to replication 220 tableCFs.clear(); 221 tableCFs.put(tableName3, new ArrayList<>()); 222 tableCFs.get(tableName3).add("f1"); 223 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 224 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 225 assertEquals(3, result.size()); 226 assertTrue("Should contain t1", result.containsKey(tableName1)); 227 assertTrue("Should contain t2", result.containsKey(tableName2)); 228 assertTrue("Should contain t3", result.containsKey(tableName3)); 229 assertNull(result.get(tableName1)); 230 assertNull(result.get(tableName2)); 231 assertEquals(1, result.get(tableName3).size()); 232 assertEquals("f1", result.get(tableName3).get(0)); 233 234 // append table column family: f1,f2 of t4 to replication 235 tableCFs.clear(); 236 tableCFs.put(tableName4, new ArrayList<>()); 237 tableCFs.get(tableName4).add("f1"); 238 tableCFs.get(tableName4).add("f2"); 239 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 240 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 241 assertEquals(4, result.size()); 242 assertTrue("Should contain t1", result.containsKey(tableName1)); 243 assertTrue("Should contain t2", result.containsKey(tableName2)); 244 assertTrue("Should contain t3", result.containsKey(tableName3)); 245 assertTrue("Should contain t4", result.containsKey(tableName4)); 246 assertNull(result.get(tableName1)); 247 assertNull(result.get(tableName2)); 248 assertEquals(1, result.get(tableName3).size()); 249 assertEquals("f1", result.get(tableName3).get(0)); 250 assertEquals(2, result.get(tableName4).size()); 251 assertEquals("f1", result.get(tableName4).get(0)); 252 assertEquals("f2", result.get(tableName4).get(1)); 253 254 // append "table5" => [], then append "table5" => ["f1"] 255 tableCFs.clear(); 256 tableCFs.put(tableName5, new ArrayList<>()); 257 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 258 tableCFs.clear(); 259 tableCFs.put(tableName5, new ArrayList<>()); 260 tableCFs.get(tableName5).add("f1"); 261 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 262 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 263 assertEquals(5, result.size()); 264 assertTrue("Should contain t5", result.containsKey(tableName5)); 265 // null means replication all cfs of tab5 266 assertNull(result.get(tableName5)); 267 268 // append "table6" => ["f1"], then append "table6" => [] 269 tableCFs.clear(); 270 tableCFs.put(tableName6, new ArrayList<>()); 271 tableCFs.get(tableName6).add("f1"); 272 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 273 tableCFs.clear(); 274 tableCFs.put(tableName6, new ArrayList<>()); 275 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 276 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 277 assertEquals(6, result.size()); 278 assertTrue("Should contain t6", result.containsKey(tableName6)); 279 // null means replication all cfs of tab6 280 assertNull(result.get(tableName6)); 281 282 admin.removeReplicationPeer(ID_ONE).join(); 283 } 284 285 @Test 286 public void testRemovePeerTableCFs() throws Exception { 287 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 288 rpc1.setClusterKey(KEY_ONE); 289 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 290 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 291 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 292 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 293 // Add a valid peer 294 admin.addReplicationPeer(ID_ONE, rpc1).join(); 295 rpc1.setReplicateAllUserTables(false); 296 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 297 298 Map<TableName, List<String>> tableCFs = new HashMap<>(); 299 try { 300 tableCFs.put(tableName3, null); 301 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 302 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); 303 } catch (CompletionException e) { 304 assertTrue(e.getCause() instanceof ReplicationException); 305 } 306 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 307 308 tableCFs.clear(); 309 tableCFs.put(tableName1, null); 310 tableCFs.put(tableName2, new ArrayList<>()); 311 tableCFs.get(tableName2).add("cf1"); 312 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 313 try { 314 tableCFs.clear(); 315 tableCFs.put(tableName3, null); 316 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 317 fail("Test case should fail as removing table-cfs from a peer whose" 318 + " table-cfs didn't contain t3"); 319 } catch (CompletionException e) { 320 assertTrue(e.getCause() instanceof ReplicationException); 321 } 322 Map<TableName, List<String>> result = 323 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 324 assertEquals(2, result.size()); 325 assertTrue("Should contain t1", result.containsKey(tableName1)); 326 assertTrue("Should contain t2", result.containsKey(tableName2)); 327 assertNull(result.get(tableName1)); 328 assertEquals(1, result.get(tableName2).size()); 329 assertEquals("cf1", result.get(tableName2).get(0)); 330 331 try { 332 tableCFs.clear(); 333 tableCFs.put(tableName1, new ArrayList<>()); 334 tableCFs.get(tableName1).add("cf1"); 335 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 336 fail("Test case should fail, because table t1 didn't specify cfs in peer config"); 337 } catch (CompletionException e) { 338 assertTrue(e.getCause() instanceof ReplicationException); 339 } 340 tableCFs.clear(); 341 tableCFs.put(tableName1, null); 342 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 343 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 344 assertEquals(1, result.size()); 345 assertEquals(1, result.get(tableName2).size()); 346 assertEquals("cf1", result.get(tableName2).get(0)); 347 348 try { 349 tableCFs.clear(); 350 tableCFs.put(tableName2, null); 351 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 352 fail("Test case should fail, because table t2 hase specified cfs in peer config"); 353 } catch (CompletionException e) { 354 assertTrue(e.getCause() instanceof ReplicationException); 355 } 356 tableCFs.clear(); 357 tableCFs.put(tableName2, new ArrayList<>()); 358 tableCFs.get(tableName2).add("cf1"); 359 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 360 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 361 362 tableCFs.clear(); 363 tableCFs.put(tableName4, new ArrayList<>()); 364 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 365 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 366 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 367 368 admin.removeReplicationPeer(ID_ONE); 369 } 370 371 @Test 372 public void testSetPeerNamespaces() throws Exception { 373 String ns1 = "ns1"; 374 String ns2 = "ns2"; 375 376 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 377 rpc.setClusterKey(KEY_ONE); 378 admin.addReplicationPeer(ID_ONE, rpc).join(); 379 rpc.setReplicateAllUserTables(false); 380 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 381 382 // add ns1 and ns2 to peer config 383 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 384 Set<String> namespaces = new HashSet<>(); 385 namespaces.add(ns1); 386 namespaces.add(ns2); 387 rpc.setNamespaces(namespaces); 388 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 389 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 390 assertEquals(2, namespaces.size()); 391 assertTrue(namespaces.contains(ns1)); 392 assertTrue(namespaces.contains(ns2)); 393 394 // update peer config only contains ns1 395 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 396 namespaces = new HashSet<>(); 397 namespaces.add(ns1); 398 rpc.setNamespaces(namespaces); 399 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 400 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 401 assertEquals(1, namespaces.size()); 402 assertTrue(namespaces.contains(ns1)); 403 404 admin.removeReplicationPeer(ID_ONE).join(); 405 } 406 407 @Test 408 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 409 String ns1 = "ns1"; 410 String ns2 = "ns2"; 411 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1"); 412 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2"); 413 414 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 415 rpc.setClusterKey(KEY_ONE); 416 admin.addReplicationPeer(ID_ONE, rpc).join(); 417 rpc.setReplicateAllUserTables(false); 418 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 419 420 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 421 Set<String> namespaces = new HashSet<String>(); 422 namespaces.add(ns1); 423 rpc.setNamespaces(namespaces); 424 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 425 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 426 Map<TableName, List<String>> tableCfs = new HashMap<>(); 427 tableCfs.put(tableName1, new ArrayList<>()); 428 rpc.setTableCFsMap(tableCfs); 429 try { 430 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 431 fail( 432 "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); 433 } catch (CompletionException e) { 434 // OK 435 } 436 437 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 438 tableCfs.clear(); 439 tableCfs.put(tableName2, new ArrayList<>()); 440 rpc.setTableCFsMap(tableCfs); 441 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 442 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 443 namespaces.clear(); 444 namespaces.add(ns2); 445 rpc.setNamespaces(namespaces); 446 try { 447 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 448 fail( 449 "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); 450 } catch (CompletionException e) { 451 // OK 452 } 453 454 admin.removeReplicationPeer(ID_ONE).join(); 455 } 456 457 @Test 458 public void testPeerBandwidth() throws Exception { 459 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 460 rpc.setClusterKey(KEY_ONE); 461 462 admin.addReplicationPeer(ID_ONE, rpc).join(); 463 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 464 assertEquals(0, rpc.getBandwidth()); 465 466 rpc.setBandwidth(2097152); 467 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 468 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); 469 470 admin.removeReplicationPeer(ID_ONE).join(); 471 } 472 473 @Test 474 public void testInvalidClusterKey() throws InterruptedException { 475 try { 476 admin.addReplicationPeer(ID_ONE, 477 ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); 478 fail(); 479 } catch (ExecutionException e) { 480 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 481 } 482 } 483 484 @Test 485 public void testClusterKeyWithTrailingSpace() throws Exception { 486 admin.addReplicationPeer(ID_ONE, 487 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get(); 488 String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey(); 489 assertEquals(KEY_ONE, clusterKey); 490 } 491 492 @Test 493 public void testInvalidReplicationEndpoint() throws InterruptedException { 494 try { 495 admin.addReplicationPeer(ID_ONE, 496 ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); 497 fail(); 498 } catch (ExecutionException e) { 499 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 500 assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); 501 } 502 } 503 504 @Test 505 public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { 506 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint 507 admin 508 .addReplicationPeer(ID_ONE, 509 ReplicationPeerConfig.newBuilder() 510 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build()) 511 .get(); 512 513 // but we still need to check cluster key if we specify the default ReplicationEndpoint 514 try { 515 admin 516 .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() 517 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) 518 .get(); 519 fail(); 520 } catch (ExecutionException e) { 521 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 522 } 523 } 524 525 /** 526 * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist. 527 */ 528 @Test 529 public void testReplicationPeerNotFoundException() throws InterruptedException { 530 String dummyPeer = "dummy_peer"; 531 try { 532 admin.removeReplicationPeer(dummyPeer).get(); 533 fail(); 534 } catch (ExecutionException e) { 535 assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class)); 536 } 537 } 538 539 @Test 540 public void testReplicationPeerModificationSwitch() throws Exception { 541 assertTrue(admin.isReplicationPeerModificationEnabled().get()); 542 // disable modification, should returns true as it is enabled by default and the above 543 // assertion has confirmed it 544 assertTrue(admin.replicationPeerModificationSwitch(false).get()); 545 ExecutionException error = assertThrows(ExecutionException.class, () -> admin 546 .addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()) 547 .get()); 548 assertThat(error.getCause().getMessage(), 549 containsString("Replication peer modification disabled")); 550 // enable again, and the previous value should be false 551 assertFalse(admin.replicationPeerModificationSwitch(true).get()); 552 } 553}