001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.regex.Pattern; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 045import org.apache.hadoop.hbase.replication.ReplicationException; 046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 047import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 051import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; 052import org.apache.hadoop.hbase.testclassification.ClientTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.junit.After; 055import org.junit.AfterClass; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Unit testing of ReplicationAdmin 067 */ 068@Category({ MediumTests.class, ClientTests.class }) 069public class TestReplicationAdmin { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestReplicationAdmin.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); 076 077 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 078 079 private final String ID_ONE = "1"; 080 private static String KEY_ONE; 081 private final String ID_SECOND = "2"; 082 private static String KEY_SECOND; 083 084 private static ReplicationAdmin admin; 085 private static Admin hbaseAdmin; 086 087 @Rule 088 public TestName name = new TestName(); 089 090 /** 091 * @throws java.lang.Exception 092 */ 093 @BeforeClass 094 public static void setUpBeforeClass() throws Exception { 095 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 096 TEST_UTIL.startMiniCluster(); 097 admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); 098 hbaseAdmin = TEST_UTIL.getAdmin(); 099 KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; 100 KEY_SECOND = TEST_UTIL.getClusterKey() + "-test2"; 101 } 102 103 @AfterClass 104 public static void tearDownAfterClass() throws Exception { 105 if (admin != null) { 106 admin.close(); 107 } 108 TEST_UTIL.shutdownMiniCluster(); 109 } 110 111 @After 112 public void tearDown() throws Exception { 113 for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) { 114 hbaseAdmin.removeReplicationPeer(desc.getPeerId()); 115 } 116 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 117 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 118 for (ServerName serverName : queueStorage.getListOfReplicators()) { 119 for (String queue : queueStorage.getAllQueues(serverName)) { 120 queueStorage.removeQueue(serverName, queue); 121 } 122 queueStorage.removeReplicatorIfQueueIsEmpty(serverName); 123 } 124 } 125 126 @Test 127 public void testConcurrentPeerOperations() throws Exception { 128 int threadNum = 5; 129 AtomicLong successCount = new AtomicLong(0); 130 131 // Test concurrent add peer operation 132 Thread[] addPeers = new Thread[threadNum]; 133 for (int i = 0; i < threadNum; i++) { 134 addPeers[i] = new Thread(() -> { 135 try { 136 hbaseAdmin.addReplicationPeer(ID_ONE, 137 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); 138 successCount.incrementAndGet(); 139 } catch (Exception e) { 140 LOG.debug("Got exception when add replication peer", e); 141 } 142 }); 143 addPeers[i].start(); 144 } 145 for (Thread addPeer : addPeers) { 146 addPeer.join(); 147 } 148 assertEquals(1, successCount.get()); 149 150 // Test concurrent remove peer operation 151 successCount.set(0); 152 Thread[] removePeers = new Thread[threadNum]; 153 for (int i = 0; i < threadNum; i++) { 154 removePeers[i] = new Thread(() -> { 155 try { 156 hbaseAdmin.removeReplicationPeer(ID_ONE); 157 successCount.incrementAndGet(); 158 } catch (Exception e) { 159 LOG.debug("Got exception when remove replication peer", e); 160 } 161 }); 162 removePeers[i].start(); 163 } 164 for (Thread removePeer : removePeers) { 165 removePeer.join(); 166 } 167 assertEquals(1, successCount.get()); 168 169 // Test concurrent add peer operation again 170 successCount.set(0); 171 addPeers = new Thread[threadNum]; 172 for (int i = 0; i < threadNum; i++) { 173 addPeers[i] = new Thread(() -> { 174 try { 175 hbaseAdmin.addReplicationPeer(ID_ONE, 176 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); 177 successCount.incrementAndGet(); 178 } catch (Exception e) { 179 LOG.debug("Got exception when add replication peer", e); 180 } 181 }); 182 addPeers[i].start(); 183 } 184 for (Thread addPeer : addPeers) { 185 addPeer.join(); 186 } 187 assertEquals(1, successCount.get()); 188 } 189 190 @Test 191 public void testAddInvalidPeer() { 192 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 193 builder.setClusterKey(KEY_ONE); 194 try { 195 String invalidPeerId = "1-2"; 196 hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build()); 197 fail("Should fail as the peer id: " + invalidPeerId + " is invalid"); 198 } catch (Exception e) { 199 // OK 200 } 201 202 try { 203 String invalidClusterKey = "2181:/hbase"; 204 builder.setClusterKey(invalidClusterKey); 205 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 206 fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid"); 207 } catch (Exception e) { 208 // OK 209 } 210 } 211 212 /** 213 * Simple testing of adding and removing peers, basically shows that all interactions with ZK work 214 */ 215 @Test 216 public void testAddRemovePeer() throws Exception { 217 ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder(); 218 rpc1.setClusterKey(KEY_ONE); 219 ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder(); 220 rpc2.setClusterKey(KEY_SECOND); 221 // Add a valid peer 222 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 223 // try adding the same (fails) 224 try { 225 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 226 } catch (Exception e) { 227 // OK! 228 } 229 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 230 // Try to remove an inexisting peer 231 try { 232 hbaseAdmin.removeReplicationPeer(ID_SECOND); 233 fail(); 234 } catch (Exception e) { 235 // OK! 236 } 237 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 238 // Add a second since multi-slave is supported 239 try { 240 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build()); 241 } catch (Exception e) { 242 fail(); 243 } 244 assertEquals(2, hbaseAdmin.listReplicationPeers().size()); 245 // Remove the first peer we added 246 hbaseAdmin.removeReplicationPeer(ID_ONE); 247 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 248 hbaseAdmin.removeReplicationPeer(ID_SECOND); 249 assertEquals(0, hbaseAdmin.listReplicationPeers().size()); 250 } 251 252 @Test 253 public void testAddPeerWithState() throws Exception { 254 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 255 rpc1.setClusterKey(KEY_ONE); 256 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true); 257 assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled()); 258 hbaseAdmin.removeReplicationPeer(ID_ONE); 259 260 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 261 rpc2.setClusterKey(KEY_SECOND); 262 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false); 263 assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled()); 264 hbaseAdmin.removeReplicationPeer(ID_SECOND); 265 } 266 267 /** 268 * Tests that the peer configuration used by ReplicationAdmin contains all the peer's properties. 269 */ 270 @Test 271 public void testPeerConfig() throws Exception { 272 ReplicationPeerConfig config = new ReplicationPeerConfig(); 273 config.setClusterKey(KEY_ONE); 274 config.getConfiguration().put("key1", "value1"); 275 config.getConfiguration().put("key2", "value2"); 276 hbaseAdmin.addReplicationPeer(ID_ONE, config); 277 278 List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers(); 279 assertEquals(1, peers.size()); 280 ReplicationPeerDescription peerOne = peers.get(0); 281 assertNotNull(peerOne); 282 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 283 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 284 285 hbaseAdmin.removeReplicationPeer(ID_ONE); 286 } 287 288 @Test 289 public void testAddPeerWithUnDeletedQueues() throws Exception { 290 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 291 rpc1.setClusterKey(KEY_ONE); 292 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 293 rpc2.setClusterKey(KEY_SECOND); 294 Configuration conf = TEST_UTIL.getConfiguration(); 295 ReplicationQueueStorage queueStorage = 296 ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf); 297 298 ServerName serverName = ServerName.valueOf("server1", 8000, 1234); 299 // add queue for ID_ONE 300 queueStorage.addWAL(serverName, ID_ONE, "file1"); 301 try { 302 admin.addPeer(ID_ONE, rpc1, null); 303 fail(); 304 } catch (Exception e) { 305 // OK! 306 } 307 queueStorage.removeQueue(serverName, ID_ONE); 308 assertEquals(0, queueStorage.getAllQueues(serverName).size()); 309 310 // add recovered queue for ID_ONE 311 queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1"); 312 try { 313 admin.addPeer(ID_ONE, rpc2, null); 314 fail(); 315 } catch (Exception e) { 316 // OK! 317 } 318 } 319 320 /** 321 * basic checks that when we add a peer that it is enabled, and that we can disable 322 */ 323 @Test 324 public void testEnableDisable() throws Exception { 325 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 326 rpc1.setClusterKey(KEY_ONE); 327 admin.addPeer(ID_ONE, rpc1, null); 328 assertEquals(1, admin.getPeersCount()); 329 assertTrue(admin.getPeerState(ID_ONE)); 330 admin.disablePeer(ID_ONE); 331 332 assertFalse(admin.getPeerState(ID_ONE)); 333 try { 334 admin.getPeerState(ID_SECOND); 335 } catch (ReplicationPeerNotFoundException e) { 336 // OK! 337 } 338 admin.removePeer(ID_ONE); 339 } 340 341 @Test 342 public void testAppendPeerTableCFs() throws Exception { 343 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 344 rpc.setClusterKey(KEY_ONE); 345 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 346 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 347 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 348 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 349 final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); 350 final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); 351 352 // Add a valid peer 353 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 354 355 // Update peer config, not replicate all user tables 356 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 357 rpc.setReplicateAllUserTables(false); 358 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 359 360 Map<TableName, List<String>> tableCFs = new HashMap<>(); 361 tableCFs.put(tableName1, null); 362 admin.appendPeerTableCFs(ID_ONE, tableCFs); 363 Map<TableName, List<String>> result = 364 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 365 assertEquals(1, result.size()); 366 assertEquals(true, result.containsKey(tableName1)); 367 assertNull(result.get(tableName1)); 368 369 // append table t2 to replication 370 tableCFs.clear(); 371 tableCFs.put(tableName2, null); 372 admin.appendPeerTableCFs(ID_ONE, tableCFs); 373 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 374 assertEquals(2, result.size()); 375 assertTrue("Should contain t1", result.containsKey(tableName1)); 376 assertTrue("Should contain t2", result.containsKey(tableName2)); 377 assertNull(result.get(tableName1)); 378 assertNull(result.get(tableName2)); 379 380 // append table column family: f1 of t3 to replication 381 tableCFs.clear(); 382 tableCFs.put(tableName3, new ArrayList<>()); 383 tableCFs.get(tableName3).add("f1"); 384 admin.appendPeerTableCFs(ID_ONE, tableCFs); 385 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 386 assertEquals(3, result.size()); 387 assertTrue("Should contain t1", result.containsKey(tableName1)); 388 assertTrue("Should contain t2", result.containsKey(tableName2)); 389 assertTrue("Should contain t3", result.containsKey(tableName3)); 390 assertNull(result.get(tableName1)); 391 assertNull(result.get(tableName2)); 392 assertEquals(1, result.get(tableName3).size()); 393 assertEquals("f1", result.get(tableName3).get(0)); 394 395 tableCFs.clear(); 396 tableCFs.put(tableName4, new ArrayList<>()); 397 tableCFs.get(tableName4).add("f1"); 398 tableCFs.get(tableName4).add("f2"); 399 admin.appendPeerTableCFs(ID_ONE, tableCFs); 400 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 401 assertEquals(4, result.size()); 402 assertTrue("Should contain t1", result.containsKey(tableName1)); 403 assertTrue("Should contain t2", result.containsKey(tableName2)); 404 assertTrue("Should contain t3", result.containsKey(tableName3)); 405 assertTrue("Should contain t4", result.containsKey(tableName4)); 406 assertNull(result.get(tableName1)); 407 assertNull(result.get(tableName2)); 408 assertEquals(1, result.get(tableName3).size()); 409 assertEquals("f1", result.get(tableName3).get(0)); 410 assertEquals(2, result.get(tableName4).size()); 411 assertEquals("f1", result.get(tableName4).get(0)); 412 assertEquals("f2", result.get(tableName4).get(1)); 413 414 // append "table5" => [], then append "table5" => ["f1"] 415 tableCFs.clear(); 416 tableCFs.put(tableName5, new ArrayList<>()); 417 admin.appendPeerTableCFs(ID_ONE, tableCFs); 418 tableCFs.clear(); 419 tableCFs.put(tableName5, new ArrayList<>()); 420 tableCFs.get(tableName5).add("f1"); 421 admin.appendPeerTableCFs(ID_ONE, tableCFs); 422 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 423 assertEquals(5, result.size()); 424 assertTrue("Should contain t5", result.containsKey(tableName5)); 425 // null means replication all cfs of tab5 426 assertNull(result.get(tableName5)); 427 428 // append "table6" => ["f1"], then append "table6" => [] 429 tableCFs.clear(); 430 tableCFs.put(tableName6, new ArrayList<>()); 431 tableCFs.get(tableName6).add("f1"); 432 admin.appendPeerTableCFs(ID_ONE, tableCFs); 433 tableCFs.clear(); 434 tableCFs.put(tableName6, new ArrayList<>()); 435 admin.appendPeerTableCFs(ID_ONE, tableCFs); 436 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 437 assertEquals(6, result.size()); 438 assertTrue("Should contain t6", result.containsKey(tableName6)); 439 // null means replication all cfs of tab6 440 assertNull(result.get(tableName6)); 441 442 admin.removePeer(ID_ONE); 443 } 444 445 @Test 446 public void testRemovePeerTableCFs() throws Exception { 447 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 448 rpc.setClusterKey(KEY_ONE); 449 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 450 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 451 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 452 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 453 454 // Add a valid peer 455 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 456 457 // Update peer config, not replicate all user tables 458 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 459 rpc.setReplicateAllUserTables(false); 460 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 461 462 Map<TableName, List<String>> tableCFs = new HashMap<>(); 463 try { 464 tableCFs.put(tableName3, null); 465 admin.removePeerTableCFs(ID_ONE, tableCFs); 466 assertTrue(false); 467 } catch (ReplicationException e) { 468 } 469 assertNull(admin.getPeerTableCFs(ID_ONE)); 470 471 tableCFs.clear(); 472 tableCFs.put(tableName1, null); 473 tableCFs.put(tableName2, new ArrayList<>()); 474 tableCFs.get(tableName2).add("cf1"); 475 admin.setPeerTableCFs(ID_ONE, tableCFs); 476 try { 477 tableCFs.clear(); 478 tableCFs.put(tableName3, null); 479 admin.removePeerTableCFs(ID_ONE, tableCFs); 480 assertTrue(false); 481 } catch (ReplicationException e) { 482 } 483 Map<TableName, List<String>> result = 484 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 485 assertEquals(2, result.size()); 486 assertTrue("Should contain t1", result.containsKey(tableName1)); 487 assertTrue("Should contain t2", result.containsKey(tableName2)); 488 assertNull(result.get(tableName1)); 489 assertEquals(1, result.get(tableName2).size()); 490 assertEquals("cf1", result.get(tableName2).get(0)); 491 492 try { 493 tableCFs.clear(); 494 tableCFs.put(tableName1, new ArrayList<>()); 495 tableCFs.get(tableName1).add("f1"); 496 admin.removePeerTableCFs(ID_ONE, tableCFs); 497 assertTrue(false); 498 } catch (ReplicationException e) { 499 } 500 tableCFs.clear(); 501 tableCFs.put(tableName1, null); 502 admin.removePeerTableCFs(ID_ONE, tableCFs); 503 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 504 assertEquals(1, result.size()); 505 assertEquals(1, result.get(tableName2).size()); 506 assertEquals("cf1", result.get(tableName2).get(0)); 507 508 try { 509 tableCFs.clear(); 510 tableCFs.put(tableName2, null); 511 admin.removePeerTableCFs(ID_ONE, tableCFs); 512 fail(); 513 } catch (ReplicationException e) { 514 } 515 tableCFs.clear(); 516 tableCFs.put(tableName2, new ArrayList<>()); 517 tableCFs.get(tableName2).add("cf1"); 518 admin.removePeerTableCFs(ID_ONE, tableCFs); 519 assertNull(admin.getPeerTableCFs(ID_ONE)); 520 521 tableCFs.clear(); 522 tableCFs.put(tableName4, new ArrayList<>()); 523 admin.setPeerTableCFs(ID_ONE, tableCFs); 524 admin.removePeerTableCFs(ID_ONE, tableCFs); 525 assertNull(admin.getPeerTableCFs(ID_ONE)); 526 527 admin.removePeer(ID_ONE); 528 } 529 530 @Test 531 public void testSetPeerNamespaces() throws Exception { 532 String ns1 = "ns1"; 533 String ns2 = "ns2"; 534 535 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 536 rpc.setClusterKey(KEY_ONE); 537 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 538 539 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 540 rpc.setReplicateAllUserTables(false); 541 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 542 543 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 544 Set<String> namespaces = new HashSet<>(); 545 namespaces.add(ns1); 546 namespaces.add(ns2); 547 rpc.setNamespaces(namespaces); 548 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 549 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 550 assertEquals(2, namespaces.size()); 551 assertTrue(namespaces.contains(ns1)); 552 assertTrue(namespaces.contains(ns2)); 553 554 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 555 namespaces = new HashSet<>(); 556 namespaces.add(ns1); 557 rpc.setNamespaces(namespaces); 558 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 559 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 560 assertEquals(1, namespaces.size()); 561 assertTrue(namespaces.contains(ns1)); 562 563 hbaseAdmin.removeReplicationPeer(ID_ONE); 564 } 565 566 @Test 567 public void testSetReplicateAllUserTables() throws Exception { 568 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 569 rpc.setClusterKey(KEY_ONE); 570 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 571 572 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 573 assertTrue(rpc.replicateAllUserTables()); 574 575 rpc.setReplicateAllUserTables(false); 576 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 577 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 578 assertFalse(rpc.replicateAllUserTables()); 579 580 rpc.setReplicateAllUserTables(true); 581 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 582 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 583 assertTrue(rpc.replicateAllUserTables()); 584 585 hbaseAdmin.removeReplicationPeer(ID_ONE); 586 } 587 588 @Test 589 public void testPeerExcludeNamespaces() throws Exception { 590 String ns1 = "ns1"; 591 String ns2 = "ns2"; 592 593 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 594 rpc.setClusterKey(KEY_ONE); 595 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 596 597 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 598 assertTrue(rpc.replicateAllUserTables()); 599 600 Set<String> namespaces = new HashSet<String>(); 601 namespaces.add(ns1); 602 namespaces.add(ns2); 603 rpc.setExcludeNamespaces(namespaces); 604 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 605 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 606 assertEquals(2, namespaces.size()); 607 assertTrue(namespaces.contains(ns1)); 608 assertTrue(namespaces.contains(ns2)); 609 610 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 611 namespaces = new HashSet<String>(); 612 namespaces.add(ns1); 613 rpc.setExcludeNamespaces(namespaces); 614 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 615 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 616 assertEquals(1, namespaces.size()); 617 assertTrue(namespaces.contains(ns1)); 618 619 hbaseAdmin.removeReplicationPeer(ID_ONE); 620 } 621 622 @Test 623 public void testPeerExcludeTableCFs() throws Exception { 624 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 625 rpc.setClusterKey(KEY_ONE); 626 TableName tab1 = TableName.valueOf("t1"); 627 TableName tab2 = TableName.valueOf("t2"); 628 TableName tab3 = TableName.valueOf("t3"); 629 TableName tab4 = TableName.valueOf("t4"); 630 631 // Add a valid peer 632 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 633 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 634 assertTrue(rpc.replicateAllUserTables()); 635 636 Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>(); 637 tableCFs.put(tab1, null); 638 rpc.setExcludeTableCFsMap(tableCFs); 639 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 640 Map<TableName, List<String>> result = 641 hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 642 assertEquals(1, result.size()); 643 assertEquals(true, result.containsKey(tab1)); 644 assertNull(result.get(tab1)); 645 646 tableCFs.put(tab2, new ArrayList<String>()); 647 tableCFs.get(tab2).add("f1"); 648 rpc.setExcludeTableCFsMap(tableCFs); 649 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 650 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 651 assertEquals(2, result.size()); 652 assertTrue("Should contain t1", result.containsKey(tab1)); 653 assertTrue("Should contain t2", result.containsKey(tab2)); 654 assertNull(result.get(tab1)); 655 assertEquals(1, result.get(tab2).size()); 656 assertEquals("f1", result.get(tab2).get(0)); 657 658 tableCFs.clear(); 659 tableCFs.put(tab3, new ArrayList<String>()); 660 tableCFs.put(tab4, new ArrayList<String>()); 661 tableCFs.get(tab4).add("f1"); 662 tableCFs.get(tab4).add("f2"); 663 rpc.setExcludeTableCFsMap(tableCFs); 664 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 665 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 666 assertEquals(2, result.size()); 667 assertTrue("Should contain t3", result.containsKey(tab3)); 668 assertTrue("Should contain t4", result.containsKey(tab4)); 669 assertNull(result.get(tab3)); 670 assertEquals(2, result.get(tab4).size()); 671 assertEquals("f1", result.get(tab4).get(0)); 672 assertEquals("f2", result.get(tab4).get(1)); 673 674 hbaseAdmin.removeReplicationPeer(ID_ONE); 675 } 676 677 @Test 678 public void testPeerConfigConflict() throws Exception { 679 // Default replicate_all flag is true 680 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 681 rpc.setClusterKey(KEY_ONE); 682 683 String ns1 = "ns1"; 684 Set<String> namespaces = new HashSet<String>(); 685 namespaces.add(ns1); 686 687 TableName tab1 = TableName.valueOf("ns2:tabl"); 688 Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); 689 tableCfs.put(tab1, new ArrayList<String>()); 690 691 try { 692 rpc.setNamespaces(namespaces); 693 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 694 fail("Should throw Exception." 695 + " When replicate all flag is true, no need to config namespaces"); 696 } catch (IOException e) { 697 // OK 698 rpc.setNamespaces(null); 699 } 700 701 try { 702 rpc.setTableCFsMap(tableCfs); 703 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 704 fail("Should throw Exception." 705 + " When replicate all flag is true, no need to config table-cfs"); 706 } catch (IOException e) { 707 // OK 708 rpc.setTableCFsMap(null); 709 } 710 711 // Set replicate_all flag to true 712 rpc.setReplicateAllUserTables(false); 713 try { 714 rpc.setExcludeNamespaces(namespaces); 715 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 716 fail("Should throw Exception." 717 + " When replicate all flag is false, no need to config exclude namespaces"); 718 } catch (IOException e) { 719 // OK 720 rpc.setExcludeNamespaces(null); 721 } 722 723 try { 724 rpc.setExcludeTableCFsMap(tableCfs); 725 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 726 fail("Should throw Exception." 727 + " When replicate all flag is false, no need to config exclude table-cfs"); 728 } catch (IOException e) { 729 // OK 730 rpc.setExcludeTableCFsMap(null); 731 } 732 733 rpc.setNamespaces(namespaces); 734 rpc.setTableCFsMap(tableCfs); 735 // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config 736 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 737 738 // Default replicate_all flag is true 739 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 740 rpc2.setClusterKey(KEY_SECOND); 741 rpc2.setExcludeNamespaces(namespaces); 742 rpc2.setExcludeTableCFsMap(tableCfs); 743 // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude 744 // table-cfs config 745 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 746 747 hbaseAdmin.removeReplicationPeer(ID_ONE); 748 hbaseAdmin.removeReplicationPeer(ID_SECOND); 749 } 750 751 @Test 752 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 753 String ns1 = "ns1"; 754 String ns2 = "ns2"; 755 final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); 756 final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); 757 758 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 759 rpc.setClusterKey(KEY_ONE); 760 rpc.setReplicateAllUserTables(false); 761 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 762 763 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 764 Set<String> namespaces = new HashSet<String>(); 765 namespaces.add(ns1); 766 rpc.setNamespaces(namespaces); 767 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 768 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 769 try { 770 Map<TableName, List<String>> tableCfs = new HashMap<>(); 771 tableCfs.put(tableName1, new ArrayList<>()); 772 rpc.setTableCFsMap(tableCfs); 773 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 774 fail("Should throw ReplicationException" + " Because table " + tableName1 775 + " conflict with namespace " + ns1); 776 } catch (Exception e) { 777 // OK 778 } 779 780 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 781 Map<TableName, List<String>> tableCfs = new HashMap<>(); 782 tableCfs.put(tableName2, new ArrayList<>()); 783 rpc.setTableCFsMap(tableCfs); 784 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 785 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 786 try { 787 namespaces.clear(); 788 namespaces.add(ns2); 789 rpc.setNamespaces(namespaces); 790 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 791 fail("Should throw ReplicationException" + " Because namespace " + ns2 792 + " conflict with table " + tableName2); 793 } catch (Exception e) { 794 // OK 795 } 796 797 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 798 rpc2.setClusterKey(KEY_SECOND); 799 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 800 801 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 802 Set<String> excludeNamespaces = new HashSet<String>(); 803 excludeNamespaces.add(ns1); 804 rpc2.setExcludeNamespaces(excludeNamespaces); 805 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 806 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 807 try { 808 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 809 excludeTableCfs.put(tableName1, new ArrayList<>()); 810 rpc2.setExcludeTableCFsMap(excludeTableCfs); 811 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 812 fail("Should throw ReplicationException" + " Because exclude table " + tableName1 813 + " conflict with exclude namespace " + ns1); 814 } catch (Exception e) { 815 // OK 816 } 817 818 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 819 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 820 excludeTableCfs.put(tableName2, new ArrayList<>()); 821 rpc2.setExcludeTableCFsMap(excludeTableCfs); 822 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 823 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 824 try { 825 namespaces.clear(); 826 namespaces.add(ns2); 827 rpc2.setNamespaces(namespaces); 828 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 829 fail("Should throw ReplicationException" + " Because exclude namespace " + ns2 830 + " conflict with exclude table " + tableName2); 831 } catch (Exception e) { 832 // OK 833 } 834 835 hbaseAdmin.removeReplicationPeer(ID_ONE); 836 hbaseAdmin.removeReplicationPeer(ID_SECOND); 837 } 838 839 @Test 840 public void testPeerBandwidth() throws Exception { 841 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 842 rpc.setClusterKey(KEY_ONE); 843 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 844 845 rpc = admin.getPeerConfig(ID_ONE); 846 assertEquals(0, rpc.getBandwidth()); 847 848 rpc.setBandwidth(2097152); 849 admin.updatePeerConfig(ID_ONE, rpc); 850 851 assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); 852 admin.removePeer(ID_ONE); 853 } 854 855 @Test 856 public void testPeerClusterKey() throws Exception { 857 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 858 builder.setClusterKey(KEY_ONE); 859 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 860 861 try { 862 builder.setClusterKey(KEY_SECOND); 863 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 864 fail("Change cluster key on an existing peer is not allowed"); 865 } catch (Exception e) { 866 // OK 867 } 868 } 869 870 @Test 871 public void testPeerReplicationEndpointImpl() throws Exception { 872 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 873 builder.setClusterKey(KEY_ONE); 874 builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); 875 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 876 877 try { 878 builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()); 879 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 880 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 881 } catch (Exception e) { 882 // OK 883 } 884 885 try { 886 builder = ReplicationPeerConfig.newBuilder(); 887 builder.setClusterKey(KEY_ONE); 888 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 889 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 890 } catch (Exception e) { 891 // OK 892 } 893 894 builder = ReplicationPeerConfig.newBuilder(); 895 builder.setClusterKey(KEY_SECOND); 896 hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); 897 898 try { 899 builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); 900 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); 901 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 902 } catch (Exception e) { 903 // OK 904 } 905 } 906}