001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.regex.Pattern;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
045import org.apache.hadoop.hbase.replication.ReplicationException;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
047import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
051import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
052import org.apache.hadoop.hbase.testclassification.ClientTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.junit.After;
055import org.junit.AfterClass;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Unit testing of ReplicationAdmin
067 */
068@Category({ MediumTests.class, ClientTests.class })
069public class TestReplicationAdmin {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestReplicationAdmin.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class);
076
077  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
078
079  private final String ID_ONE = "1";
080  private static String KEY_ONE;
081  private final String ID_SECOND = "2";
082  private static String KEY_SECOND;
083
084  private static ReplicationAdmin admin;
085  private static Admin hbaseAdmin;
086
087  @Rule
088  public TestName name = new TestName();
089
090  /**
091   * @throws java.lang.Exception
092   */
093  @BeforeClass
094  public static void setUpBeforeClass() throws Exception {
095    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
096    TEST_UTIL.startMiniCluster();
097    admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
098    hbaseAdmin = TEST_UTIL.getAdmin();
099    KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
100    KEY_SECOND = TEST_UTIL.getClusterKey() + "-test2";
101  }
102
103  @AfterClass
104  public static void tearDownAfterClass() throws Exception {
105    if (admin != null) {
106      admin.close();
107    }
108    TEST_UTIL.shutdownMiniCluster();
109  }
110
111  @After
112  public void tearDown() throws Exception {
113    for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) {
114      hbaseAdmin.removeReplicationPeer(desc.getPeerId());
115    }
116    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
117      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
118    for (ServerName serverName : queueStorage.getListOfReplicators()) {
119      for (String queue : queueStorage.getAllQueues(serverName)) {
120        queueStorage.removeQueue(serverName, queue);
121      }
122      queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
123    }
124  }
125
126  @Test
127  public void testConcurrentPeerOperations() throws Exception {
128    int threadNum = 5;
129    AtomicLong successCount = new AtomicLong(0);
130
131    // Test concurrent add peer operation
132    Thread[] addPeers = new Thread[threadNum];
133    for (int i = 0; i < threadNum; i++) {
134      addPeers[i] = new Thread(() -> {
135        try {
136          hbaseAdmin.addReplicationPeer(ID_ONE,
137            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
138          successCount.incrementAndGet();
139        } catch (Exception e) {
140          LOG.debug("Got exception when add replication peer", e);
141        }
142      });
143      addPeers[i].start();
144    }
145    for (Thread addPeer : addPeers) {
146      addPeer.join();
147    }
148    assertEquals(1, successCount.get());
149
150    // Test concurrent remove peer operation
151    successCount.set(0);
152    Thread[] removePeers = new Thread[threadNum];
153    for (int i = 0; i < threadNum; i++) {
154      removePeers[i] = new Thread(() -> {
155        try {
156          hbaseAdmin.removeReplicationPeer(ID_ONE);
157          successCount.incrementAndGet();
158        } catch (Exception e) {
159          LOG.debug("Got exception when remove replication peer", e);
160        }
161      });
162      removePeers[i].start();
163    }
164    for (Thread removePeer : removePeers) {
165      removePeer.join();
166    }
167    assertEquals(1, successCount.get());
168
169    // Test concurrent add peer operation again
170    successCount.set(0);
171    addPeers = new Thread[threadNum];
172    for (int i = 0; i < threadNum; i++) {
173      addPeers[i] = new Thread(() -> {
174        try {
175          hbaseAdmin.addReplicationPeer(ID_ONE,
176            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
177          successCount.incrementAndGet();
178        } catch (Exception e) {
179          LOG.debug("Got exception when add replication peer", e);
180        }
181      });
182      addPeers[i].start();
183    }
184    for (Thread addPeer : addPeers) {
185      addPeer.join();
186    }
187    assertEquals(1, successCount.get());
188  }
189
190  @Test
191  public void testAddInvalidPeer() {
192    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
193    builder.setClusterKey(KEY_ONE);
194    try {
195      String invalidPeerId = "1-2";
196      hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build());
197      fail("Should fail as the peer id: " + invalidPeerId + " is invalid");
198    } catch (Exception e) {
199      // OK
200    }
201
202    try {
203      String invalidClusterKey = "2181:/hbase";
204      builder.setClusterKey(invalidClusterKey);
205      hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
206      fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid");
207    } catch (Exception e) {
208      // OK
209    }
210  }
211
212  /**
213   * Simple testing of adding and removing peers, basically shows that all interactions with ZK work
214   */
215  @Test
216  public void testAddRemovePeer() throws Exception {
217    ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder();
218    rpc1.setClusterKey(KEY_ONE);
219    ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder();
220    rpc2.setClusterKey(KEY_SECOND);
221    // Add a valid peer
222    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
223    // try adding the same (fails)
224    try {
225      hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
226    } catch (Exception e) {
227      // OK!
228    }
229    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
230    // Try to remove an inexisting peer
231    try {
232      hbaseAdmin.removeReplicationPeer(ID_SECOND);
233      fail();
234    } catch (Exception e) {
235      // OK!
236    }
237    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
238    // Add a second since multi-slave is supported
239    try {
240      hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build());
241    } catch (Exception e) {
242      fail();
243    }
244    assertEquals(2, hbaseAdmin.listReplicationPeers().size());
245    // Remove the first peer we added
246    hbaseAdmin.removeReplicationPeer(ID_ONE);
247    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
248    hbaseAdmin.removeReplicationPeer(ID_SECOND);
249    assertEquals(0, hbaseAdmin.listReplicationPeers().size());
250  }
251
252  @Test
253  public void testAddPeerWithState() throws Exception {
254    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
255    rpc1.setClusterKey(KEY_ONE);
256    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
257    assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
258    hbaseAdmin.removeReplicationPeer(ID_ONE);
259
260    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
261    rpc2.setClusterKey(KEY_SECOND);
262    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
263    assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
264    hbaseAdmin.removeReplicationPeer(ID_SECOND);
265  }
266
267  /**
268   * Tests that the peer configuration used by ReplicationAdmin contains all the peer's properties.
269   */
270  @Test
271  public void testPeerConfig() throws Exception {
272    ReplicationPeerConfig config = new ReplicationPeerConfig();
273    config.setClusterKey(KEY_ONE);
274    config.getConfiguration().put("key1", "value1");
275    config.getConfiguration().put("key2", "value2");
276    hbaseAdmin.addReplicationPeer(ID_ONE, config);
277
278    List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
279    assertEquals(1, peers.size());
280    ReplicationPeerDescription peerOne = peers.get(0);
281    assertNotNull(peerOne);
282    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
283    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
284
285    hbaseAdmin.removeReplicationPeer(ID_ONE);
286  }
287
288  @Test
289  public void testAddPeerWithUnDeletedQueues() throws Exception {
290    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
291    rpc1.setClusterKey(KEY_ONE);
292    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
293    rpc2.setClusterKey(KEY_SECOND);
294    Configuration conf = TEST_UTIL.getConfiguration();
295    ReplicationQueueStorage queueStorage =
296      ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf);
297
298    ServerName serverName = ServerName.valueOf("server1", 8000, 1234);
299    // add queue for ID_ONE
300    queueStorage.addWAL(serverName, ID_ONE, "file1");
301    try {
302      admin.addPeer(ID_ONE, rpc1, null);
303      fail();
304    } catch (Exception e) {
305      // OK!
306    }
307    queueStorage.removeQueue(serverName, ID_ONE);
308    assertEquals(0, queueStorage.getAllQueues(serverName).size());
309
310    // add recovered queue for ID_ONE
311    queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1");
312    try {
313      admin.addPeer(ID_ONE, rpc2, null);
314      fail();
315    } catch (Exception e) {
316      // OK!
317    }
318  }
319
320  /**
321   * basic checks that when we add a peer that it is enabled, and that we can disable
322   */
323  @Test
324  public void testEnableDisable() throws Exception {
325    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
326    rpc1.setClusterKey(KEY_ONE);
327    admin.addPeer(ID_ONE, rpc1, null);
328    assertEquals(1, admin.getPeersCount());
329    assertTrue(admin.getPeerState(ID_ONE));
330    admin.disablePeer(ID_ONE);
331
332    assertFalse(admin.getPeerState(ID_ONE));
333    try {
334      admin.getPeerState(ID_SECOND);
335    } catch (ReplicationPeerNotFoundException e) {
336      // OK!
337    }
338    admin.removePeer(ID_ONE);
339  }
340
341  @Test
342  public void testAppendPeerTableCFs() throws Exception {
343    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
344    rpc.setClusterKey(KEY_ONE);
345    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
346    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
347    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
348    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
349    final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5");
350    final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
351
352    // Add a valid peer
353    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
354
355    // Update peer config, not replicate all user tables
356    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
357    rpc.setReplicateAllUserTables(false);
358    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
359
360    Map<TableName, List<String>> tableCFs = new HashMap<>();
361    tableCFs.put(tableName1, null);
362    admin.appendPeerTableCFs(ID_ONE, tableCFs);
363    Map<TableName, List<String>> result =
364      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
365    assertEquals(1, result.size());
366    assertEquals(true, result.containsKey(tableName1));
367    assertNull(result.get(tableName1));
368
369    // append table t2 to replication
370    tableCFs.clear();
371    tableCFs.put(tableName2, null);
372    admin.appendPeerTableCFs(ID_ONE, tableCFs);
373    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
374    assertEquals(2, result.size());
375    assertTrue("Should contain t1", result.containsKey(tableName1));
376    assertTrue("Should contain t2", result.containsKey(tableName2));
377    assertNull(result.get(tableName1));
378    assertNull(result.get(tableName2));
379
380    // append table column family: f1 of t3 to replication
381    tableCFs.clear();
382    tableCFs.put(tableName3, new ArrayList<>());
383    tableCFs.get(tableName3).add("f1");
384    admin.appendPeerTableCFs(ID_ONE, tableCFs);
385    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
386    assertEquals(3, result.size());
387    assertTrue("Should contain t1", result.containsKey(tableName1));
388    assertTrue("Should contain t2", result.containsKey(tableName2));
389    assertTrue("Should contain t3", result.containsKey(tableName3));
390    assertNull(result.get(tableName1));
391    assertNull(result.get(tableName2));
392    assertEquals(1, result.get(tableName3).size());
393    assertEquals("f1", result.get(tableName3).get(0));
394
395    tableCFs.clear();
396    tableCFs.put(tableName4, new ArrayList<>());
397    tableCFs.get(tableName4).add("f1");
398    tableCFs.get(tableName4).add("f2");
399    admin.appendPeerTableCFs(ID_ONE, tableCFs);
400    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
401    assertEquals(4, result.size());
402    assertTrue("Should contain t1", result.containsKey(tableName1));
403    assertTrue("Should contain t2", result.containsKey(tableName2));
404    assertTrue("Should contain t3", result.containsKey(tableName3));
405    assertTrue("Should contain t4", result.containsKey(tableName4));
406    assertNull(result.get(tableName1));
407    assertNull(result.get(tableName2));
408    assertEquals(1, result.get(tableName3).size());
409    assertEquals("f1", result.get(tableName3).get(0));
410    assertEquals(2, result.get(tableName4).size());
411    assertEquals("f1", result.get(tableName4).get(0));
412    assertEquals("f2", result.get(tableName4).get(1));
413
414    // append "table5" => [], then append "table5" => ["f1"]
415    tableCFs.clear();
416    tableCFs.put(tableName5, new ArrayList<>());
417    admin.appendPeerTableCFs(ID_ONE, tableCFs);
418    tableCFs.clear();
419    tableCFs.put(tableName5, new ArrayList<>());
420    tableCFs.get(tableName5).add("f1");
421    admin.appendPeerTableCFs(ID_ONE, tableCFs);
422    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
423    assertEquals(5, result.size());
424    assertTrue("Should contain t5", result.containsKey(tableName5));
425    // null means replication all cfs of tab5
426    assertNull(result.get(tableName5));
427
428    // append "table6" => ["f1"], then append "table6" => []
429    tableCFs.clear();
430    tableCFs.put(tableName6, new ArrayList<>());
431    tableCFs.get(tableName6).add("f1");
432    admin.appendPeerTableCFs(ID_ONE, tableCFs);
433    tableCFs.clear();
434    tableCFs.put(tableName6, new ArrayList<>());
435    admin.appendPeerTableCFs(ID_ONE, tableCFs);
436    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
437    assertEquals(6, result.size());
438    assertTrue("Should contain t6", result.containsKey(tableName6));
439    // null means replication all cfs of tab6
440    assertNull(result.get(tableName6));
441
442    admin.removePeer(ID_ONE);
443  }
444
445  @Test
446  public void testRemovePeerTableCFs() throws Exception {
447    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
448    rpc.setClusterKey(KEY_ONE);
449    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
450    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
451    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
452    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
453
454    // Add a valid peer
455    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
456
457    // Update peer config, not replicate all user tables
458    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
459    rpc.setReplicateAllUserTables(false);
460    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
461
462    Map<TableName, List<String>> tableCFs = new HashMap<>();
463    try {
464      tableCFs.put(tableName3, null);
465      admin.removePeerTableCFs(ID_ONE, tableCFs);
466      assertTrue(false);
467    } catch (ReplicationException e) {
468    }
469    assertNull(admin.getPeerTableCFs(ID_ONE));
470
471    tableCFs.clear();
472    tableCFs.put(tableName1, null);
473    tableCFs.put(tableName2, new ArrayList<>());
474    tableCFs.get(tableName2).add("cf1");
475    admin.setPeerTableCFs(ID_ONE, tableCFs);
476    try {
477      tableCFs.clear();
478      tableCFs.put(tableName3, null);
479      admin.removePeerTableCFs(ID_ONE, tableCFs);
480      assertTrue(false);
481    } catch (ReplicationException e) {
482    }
483    Map<TableName, List<String>> result =
484      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
485    assertEquals(2, result.size());
486    assertTrue("Should contain t1", result.containsKey(tableName1));
487    assertTrue("Should contain t2", result.containsKey(tableName2));
488    assertNull(result.get(tableName1));
489    assertEquals(1, result.get(tableName2).size());
490    assertEquals("cf1", result.get(tableName2).get(0));
491
492    try {
493      tableCFs.clear();
494      tableCFs.put(tableName1, new ArrayList<>());
495      tableCFs.get(tableName1).add("f1");
496      admin.removePeerTableCFs(ID_ONE, tableCFs);
497      assertTrue(false);
498    } catch (ReplicationException e) {
499    }
500    tableCFs.clear();
501    tableCFs.put(tableName1, null);
502    admin.removePeerTableCFs(ID_ONE, tableCFs);
503    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
504    assertEquals(1, result.size());
505    assertEquals(1, result.get(tableName2).size());
506    assertEquals("cf1", result.get(tableName2).get(0));
507
508    try {
509      tableCFs.clear();
510      tableCFs.put(tableName2, null);
511      admin.removePeerTableCFs(ID_ONE, tableCFs);
512      fail();
513    } catch (ReplicationException e) {
514    }
515    tableCFs.clear();
516    tableCFs.put(tableName2, new ArrayList<>());
517    tableCFs.get(tableName2).add("cf1");
518    admin.removePeerTableCFs(ID_ONE, tableCFs);
519    assertNull(admin.getPeerTableCFs(ID_ONE));
520
521    tableCFs.clear();
522    tableCFs.put(tableName4, new ArrayList<>());
523    admin.setPeerTableCFs(ID_ONE, tableCFs);
524    admin.removePeerTableCFs(ID_ONE, tableCFs);
525    assertNull(admin.getPeerTableCFs(ID_ONE));
526
527    admin.removePeer(ID_ONE);
528  }
529
530  @Test
531  public void testSetPeerNamespaces() throws Exception {
532    String ns1 = "ns1";
533    String ns2 = "ns2";
534
535    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
536    rpc.setClusterKey(KEY_ONE);
537    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
538
539    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
540    rpc.setReplicateAllUserTables(false);
541    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
542
543    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
544    Set<String> namespaces = new HashSet<>();
545    namespaces.add(ns1);
546    namespaces.add(ns2);
547    rpc.setNamespaces(namespaces);
548    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
549    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
550    assertEquals(2, namespaces.size());
551    assertTrue(namespaces.contains(ns1));
552    assertTrue(namespaces.contains(ns2));
553
554    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
555    namespaces = new HashSet<>();
556    namespaces.add(ns1);
557    rpc.setNamespaces(namespaces);
558    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
559    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
560    assertEquals(1, namespaces.size());
561    assertTrue(namespaces.contains(ns1));
562
563    hbaseAdmin.removeReplicationPeer(ID_ONE);
564  }
565
566  @Test
567  public void testSetReplicateAllUserTables() throws Exception {
568    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
569    rpc.setClusterKey(KEY_ONE);
570    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
571
572    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
573    assertTrue(rpc.replicateAllUserTables());
574
575    rpc.setReplicateAllUserTables(false);
576    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
577    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
578    assertFalse(rpc.replicateAllUserTables());
579
580    rpc.setReplicateAllUserTables(true);
581    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
582    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
583    assertTrue(rpc.replicateAllUserTables());
584
585    hbaseAdmin.removeReplicationPeer(ID_ONE);
586  }
587
588  @Test
589  public void testPeerExcludeNamespaces() throws Exception {
590    String ns1 = "ns1";
591    String ns2 = "ns2";
592
593    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
594    rpc.setClusterKey(KEY_ONE);
595    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
596
597    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
598    assertTrue(rpc.replicateAllUserTables());
599
600    Set<String> namespaces = new HashSet<String>();
601    namespaces.add(ns1);
602    namespaces.add(ns2);
603    rpc.setExcludeNamespaces(namespaces);
604    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
605    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
606    assertEquals(2, namespaces.size());
607    assertTrue(namespaces.contains(ns1));
608    assertTrue(namespaces.contains(ns2));
609
610    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
611    namespaces = new HashSet<String>();
612    namespaces.add(ns1);
613    rpc.setExcludeNamespaces(namespaces);
614    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
615    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
616    assertEquals(1, namespaces.size());
617    assertTrue(namespaces.contains(ns1));
618
619    hbaseAdmin.removeReplicationPeer(ID_ONE);
620  }
621
622  @Test
623  public void testPeerExcludeTableCFs() throws Exception {
624    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
625    rpc.setClusterKey(KEY_ONE);
626    TableName tab1 = TableName.valueOf("t1");
627    TableName tab2 = TableName.valueOf("t2");
628    TableName tab3 = TableName.valueOf("t3");
629    TableName tab4 = TableName.valueOf("t4");
630
631    // Add a valid peer
632    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
633    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
634    assertTrue(rpc.replicateAllUserTables());
635
636    Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
637    tableCFs.put(tab1, null);
638    rpc.setExcludeTableCFsMap(tableCFs);
639    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
640    Map<TableName, List<String>> result =
641      hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
642    assertEquals(1, result.size());
643    assertEquals(true, result.containsKey(tab1));
644    assertNull(result.get(tab1));
645
646    tableCFs.put(tab2, new ArrayList<String>());
647    tableCFs.get(tab2).add("f1");
648    rpc.setExcludeTableCFsMap(tableCFs);
649    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
650    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
651    assertEquals(2, result.size());
652    assertTrue("Should contain t1", result.containsKey(tab1));
653    assertTrue("Should contain t2", result.containsKey(tab2));
654    assertNull(result.get(tab1));
655    assertEquals(1, result.get(tab2).size());
656    assertEquals("f1", result.get(tab2).get(0));
657
658    tableCFs.clear();
659    tableCFs.put(tab3, new ArrayList<String>());
660    tableCFs.put(tab4, new ArrayList<String>());
661    tableCFs.get(tab4).add("f1");
662    tableCFs.get(tab4).add("f2");
663    rpc.setExcludeTableCFsMap(tableCFs);
664    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
665    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
666    assertEquals(2, result.size());
667    assertTrue("Should contain t3", result.containsKey(tab3));
668    assertTrue("Should contain t4", result.containsKey(tab4));
669    assertNull(result.get(tab3));
670    assertEquals(2, result.get(tab4).size());
671    assertEquals("f1", result.get(tab4).get(0));
672    assertEquals("f2", result.get(tab4).get(1));
673
674    hbaseAdmin.removeReplicationPeer(ID_ONE);
675  }
676
677  @Test
678  public void testPeerConfigConflict() throws Exception {
679    // Default replicate_all flag is true
680    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
681    rpc.setClusterKey(KEY_ONE);
682
683    String ns1 = "ns1";
684    Set<String> namespaces = new HashSet<String>();
685    namespaces.add(ns1);
686
687    TableName tab1 = TableName.valueOf("ns2:tabl");
688    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
689    tableCfs.put(tab1, new ArrayList<String>());
690
691    try {
692      rpc.setNamespaces(namespaces);
693      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
694      fail("Should throw Exception."
695        + " When replicate all flag is true, no need to config namespaces");
696    } catch (IOException e) {
697      // OK
698      rpc.setNamespaces(null);
699    }
700
701    try {
702      rpc.setTableCFsMap(tableCfs);
703      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
704      fail("Should throw Exception."
705        + " When replicate all flag is true, no need to config table-cfs");
706    } catch (IOException e) {
707      // OK
708      rpc.setTableCFsMap(null);
709    }
710
711    // Set replicate_all flag to true
712    rpc.setReplicateAllUserTables(false);
713    try {
714      rpc.setExcludeNamespaces(namespaces);
715      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
716      fail("Should throw Exception."
717        + " When replicate all flag is false, no need to config exclude namespaces");
718    } catch (IOException e) {
719      // OK
720      rpc.setExcludeNamespaces(null);
721    }
722
723    try {
724      rpc.setExcludeTableCFsMap(tableCfs);
725      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
726      fail("Should throw Exception."
727        + " When replicate all flag is false, no need to config exclude table-cfs");
728    } catch (IOException e) {
729      // OK
730      rpc.setExcludeTableCFsMap(null);
731    }
732
733    rpc.setNamespaces(namespaces);
734    rpc.setTableCFsMap(tableCfs);
735    // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
736    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
737
738    // Default replicate_all flag is true
739    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
740    rpc2.setClusterKey(KEY_SECOND);
741    rpc2.setExcludeNamespaces(namespaces);
742    rpc2.setExcludeTableCFsMap(tableCfs);
743    // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
744    // table-cfs config
745    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
746
747    hbaseAdmin.removeReplicationPeer(ID_ONE);
748    hbaseAdmin.removeReplicationPeer(ID_SECOND);
749  }
750
751  @Test
752  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
753    String ns1 = "ns1";
754    String ns2 = "ns2";
755    final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName());
756    final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2");
757
758    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
759    rpc.setClusterKey(KEY_ONE);
760    rpc.setReplicateAllUserTables(false);
761    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
762
763    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
764    Set<String> namespaces = new HashSet<String>();
765    namespaces.add(ns1);
766    rpc.setNamespaces(namespaces);
767    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
768    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
769    try {
770      Map<TableName, List<String>> tableCfs = new HashMap<>();
771      tableCfs.put(tableName1, new ArrayList<>());
772      rpc.setTableCFsMap(tableCfs);
773      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
774      fail("Should throw ReplicationException" + " Because table " + tableName1
775        + " conflict with namespace " + ns1);
776    } catch (Exception e) {
777      // OK
778    }
779
780    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
781    Map<TableName, List<String>> tableCfs = new HashMap<>();
782    tableCfs.put(tableName2, new ArrayList<>());
783    rpc.setTableCFsMap(tableCfs);
784    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
785    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
786    try {
787      namespaces.clear();
788      namespaces.add(ns2);
789      rpc.setNamespaces(namespaces);
790      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
791      fail("Should throw ReplicationException" + " Because namespace " + ns2
792        + " conflict with table " + tableName2);
793    } catch (Exception e) {
794      // OK
795    }
796
797    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
798    rpc2.setClusterKey(KEY_SECOND);
799    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
800
801    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
802    Set<String> excludeNamespaces = new HashSet<String>();
803    excludeNamespaces.add(ns1);
804    rpc2.setExcludeNamespaces(excludeNamespaces);
805    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
806    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
807    try {
808      Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
809      excludeTableCfs.put(tableName1, new ArrayList<>());
810      rpc2.setExcludeTableCFsMap(excludeTableCfs);
811      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
812      fail("Should throw ReplicationException" + " Because exclude table " + tableName1
813        + " conflict with exclude namespace " + ns1);
814    } catch (Exception e) {
815      // OK
816    }
817
818    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
819    Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
820    excludeTableCfs.put(tableName2, new ArrayList<>());
821    rpc2.setExcludeTableCFsMap(excludeTableCfs);
822    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
823    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
824    try {
825      namespaces.clear();
826      namespaces.add(ns2);
827      rpc2.setNamespaces(namespaces);
828      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
829      fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
830        + " conflict with exclude table " + tableName2);
831    } catch (Exception e) {
832      // OK
833    }
834
835    hbaseAdmin.removeReplicationPeer(ID_ONE);
836    hbaseAdmin.removeReplicationPeer(ID_SECOND);
837  }
838
839  @Test
840  public void testPeerBandwidth() throws Exception {
841    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
842    rpc.setClusterKey(KEY_ONE);
843    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
844
845    rpc = admin.getPeerConfig(ID_ONE);
846    assertEquals(0, rpc.getBandwidth());
847
848    rpc.setBandwidth(2097152);
849    admin.updatePeerConfig(ID_ONE, rpc);
850
851    assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
852    admin.removePeer(ID_ONE);
853  }
854
855  @Test
856  public void testPeerClusterKey() throws Exception {
857    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
858    builder.setClusterKey(KEY_ONE);
859    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
860
861    try {
862      builder.setClusterKey(KEY_SECOND);
863      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
864      fail("Change cluster key on an existing peer is not allowed");
865    } catch (Exception e) {
866      // OK
867    }
868  }
869
870  @Test
871  public void testPeerReplicationEndpointImpl() throws Exception {
872    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
873    builder.setClusterKey(KEY_ONE);
874    builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
875    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
876
877    try {
878      builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName());
879      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
880      fail("Change replication endpoint implementation class on an existing peer is not allowed");
881    } catch (Exception e) {
882      // OK
883    }
884
885    try {
886      builder = ReplicationPeerConfig.newBuilder();
887      builder.setClusterKey(KEY_ONE);
888      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
889      fail("Change replication endpoint implementation class on an existing peer is not allowed");
890    } catch (Exception e) {
891      // OK
892    }
893
894    builder = ReplicationPeerConfig.newBuilder();
895    builder.setClusterKey(KEY_SECOND);
896    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
897
898    try {
899      builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
900      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
901      fail("Change replication endpoint implementation class on an existing peer is not allowed");
902    } catch (Exception e) {
903      // OK
904    }
905  }
906}