001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 035import org.apache.hadoop.hbase.testclassification.ClientTests; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.junit.AfterClass; 039import org.junit.BeforeClass; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@Category({ MediumTests.class, ClientTests.class }) 047public class TestReplicationAdminForSyncReplication { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestReplicationAdminForSyncReplication.class); 052 053 private static final Logger LOG = 054 LoggerFactory.getLogger(TestReplicationAdminForSyncReplication.class); 055 056 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 057 058 private static Admin hbaseAdmin; 059 060 @BeforeClass 061 public static void setUpBeforeClass() throws Exception { 062 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 063 TEST_UTIL.startMiniCluster(); 064 hbaseAdmin = TEST_UTIL.getAdmin(); 065 } 066 067 @AfterClass 068 public static void tearDownAfterClass() throws Exception { 069 hbaseAdmin.close(); 070 TEST_UTIL.shutdownMiniCluster(); 071 } 072 073 @Test 074 public void testAddPeerWithSameTable() throws Exception { 075 TableName tableName = TableName.valueOf("testAddPeerWithSameTable"); 076 TEST_UTIL.createTable(tableName, Bytes.toBytes("family")); 077 078 boolean[] success = { true, true, true, true, true, true }; 079 Thread[] threads = new Thread[5]; 080 for (int i = 0; i < 5; i++) { 081 String peerId = "id" + i; 082 String clusterKey = TEST_UTIL.getZkConnectionURI() + "-test" + i; 083 int index = i; 084 threads[i] = new Thread(() -> { 085 try { 086 hbaseAdmin.addReplicationPeer(peerId, 087 buildSyncReplicationPeerConfig(clusterKey, tableName)); 088 } catch (IOException e) { 089 LOG.error("Failed to add replication peer " + peerId); 090 success[index] = false; 091 } 092 }); 093 } 094 for (int i = 0; i < 5; i++) { 095 threads[i].start(); 096 } 097 for (int i = 0; i < 5; i++) { 098 threads[i].join(); 099 } 100 101 int successCount = 0; 102 for (int i = 0; i < 5; i++) { 103 if (success[i]) { 104 successCount++; 105 } 106 } 107 assertEquals("Only one peer can be added successfully", 1, successCount); 108 } 109 110 private ReplicationPeerConfig buildSyncReplicationPeerConfig(String clusterKey, 111 TableName tableName) throws IOException { 112 Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL"); 113 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 114 builder.setClusterKey(clusterKey); 115 builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(), 116 TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString()); 117 builder.setReplicateAllUserTables(false); 118 Map<TableName, List<String>> tableCfs = new HashMap<>(); 119 tableCfs.put(tableName, new ArrayList<>()); 120 builder.setTableCFsMap(tableCfs); 121 return builder.build(); 122 } 123}