001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertTrue; 021 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.NamespaceDescriptor; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication; 041import org.apache.hadoop.hbase.testclassification.LargeTests; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 045import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 046import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 047import org.junit.After; 048import org.junit.Before; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Testcase for HBASE-23098 058 */ 059// LargeTest because spins up four clusters. 060@Category({ ReplicationTests.class, LargeTests.class }) 061public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication { 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestNamespaceReplicationWithBulkLoadedData.class); 065 private static final Logger LOG = 066 LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class); 067 068 private static final HBaseTestingUtil UTIL4 = new HBaseTestingUtil(); 069 private static final String PEER4_CLUSTER_ID = "peer4"; 070 private static final String PEER4_NS = "ns_peer1"; 071 private static final String PEER4_NS_TABLE = "ns_peer2"; 072 073 private static final Configuration CONF4 = UTIL4.getConfiguration(); 074 075 private static final String NS1 = "ns1"; 076 private static final String NS2 = "ns2"; 077 078 private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup"); 079 private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup"); 080 081 @BeforeClass 082 public static void setUpBeforeClass() throws Exception { 083 setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID); 084 setupConfig(UTIL4, "/4"); 085 TestBulkLoadReplication.setUpBeforeClass(); 086 startFourthCluster(); 087 } 088 089 private static void startFourthCluster() throws Exception { 090 LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3"); 091 UTIL4.setZkCluster(UTIL1.getZkCluster()); 092 UTIL4.startMiniCluster(NUM_SLAVES1); 093 094 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) 095 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) 096 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 097 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 098 099 Connection connection4 = ConnectionFactory.createConnection(CONF4); 100 try (Admin admin4 = connection4.getAdmin()) { 101 admin4.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 102 } 103 UTIL4.waitUntilAllRegionsAssigned(tableName); 104 } 105 106 @Before 107 @Override 108 public void setUpBase() throws Exception { 109 /** 110 * "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3 and this test add the fourth 111 * cluster. So we have following topology: 1 / \ 2 4 / 3 The 1 -> 4 has two peers, ns_peer1: ns1 112 * -> ns1 (validate this peer hfile-refs) ns_peer1 configuration is NAMESPACES => ["ns1"] 113 * ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is ns_peer2 configuration is NAMESPACES 114 * => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => []} The 1 -> 2 has one peer, this peer 115 * configuration is add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase" 116 */ 117 super.setUpBase(); 118 119 // Create tables 120 TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE) 121 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 122 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 123 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 124 125 TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE) 126 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 127 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 128 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 129 130 Admin admin1 = UTIL1.getAdmin(); 131 admin1.createNamespace(NamespaceDescriptor.create(NS1).build()); 132 admin1.createNamespace(NamespaceDescriptor.create(NS2).build()); 133 admin1.createTable(table1); 134 admin1.createTable(table2); 135 136 Admin admin2 = UTIL2.getAdmin(); 137 admin2.createNamespace(NamespaceDescriptor.create(NS1).build()); 138 admin2.createNamespace(NamespaceDescriptor.create(NS2).build()); 139 admin2.createTable(table1); 140 admin2.createTable(table2); 141 142 Admin admin3 = UTIL3.getAdmin(); 143 admin3.createNamespace(NamespaceDescriptor.create(NS1).build()); 144 admin3.createNamespace(NamespaceDescriptor.create(NS2).build()); 145 admin3.createTable(table1); 146 admin3.createTable(table2); 147 148 Admin admin4 = UTIL4.getAdmin(); 149 admin4.createNamespace(NamespaceDescriptor.create(NS1).build()); 150 admin4.createNamespace(NamespaceDescriptor.create(NS2).build()); 151 admin4.createTable(table1); 152 admin4.createTable(table2); 153 154 /** 155 * Set ns_peer1 1: ns1 -> 2: ns1 add_peer 'ns_peer1', CLUSTER_KEY => 156 * "zk1,zk2,zk3:2182:/hbase-prod", NAMESPACES => ["ns1"] 157 */ 158 Set<String> namespaces = new HashSet<>(); 159 namespaces.add(NS1); 160 ReplicationPeerConfig rpc4_ns = 161 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getRpcConnnectionURI()) 162 .setReplicateAllUserTables(false).setNamespaces(namespaces).build(); 163 admin1.addReplicationPeer(PEER4_NS, rpc4_ns); 164 165 /** 166 * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup add_peer 'ns_peer2', CLUSTER_KEY => 167 * "zk1,zk2,zk3:2182:/hbase-prod", NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] } 168 */ 169 Map<TableName, List<String>> tableCFsMap = new HashMap<>(); 170 tableCFsMap.put(NS2_TABLE, null); 171 ReplicationPeerConfig rpc4_ns_table = 172 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getRpcConnnectionURI()) 173 .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build(); 174 admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table); 175 } 176 177 @After 178 @Override 179 public void tearDownBase() throws Exception { 180 super.tearDownBase(); 181 TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE) 182 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 183 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 184 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 185 186 TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE) 187 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 188 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 189 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 190 Admin admin1 = UTIL1.getAdmin(); 191 admin1.disableTable(table1.getTableName()); 192 admin1.deleteTable(table1.getTableName()); 193 admin1.disableTable(table2.getTableName()); 194 admin1.deleteTable(table2.getTableName()); 195 admin1.deleteNamespace(NS1); 196 admin1.deleteNamespace(NS2); 197 198 Admin admin2 = UTIL2.getAdmin(); 199 admin2.disableTable(table1.getTableName()); 200 admin2.deleteTable(table1.getTableName()); 201 admin2.disableTable(table2.getTableName()); 202 admin2.deleteTable(table2.getTableName()); 203 admin2.deleteNamespace(NS1); 204 admin2.deleteNamespace(NS2); 205 206 Admin admin3 = UTIL3.getAdmin(); 207 admin3.disableTable(table1.getTableName()); 208 admin3.deleteTable(table1.getTableName()); 209 admin3.disableTable(table2.getTableName()); 210 admin3.deleteTable(table2.getTableName()); 211 admin3.deleteNamespace(NS1); 212 admin3.deleteNamespace(NS2); 213 214 Admin admin4 = UTIL4.getAdmin(); 215 admin4.disableTable(table1.getTableName()); 216 admin4.deleteTable(table1.getTableName()); 217 admin4.disableTable(table2.getTableName()); 218 admin4.deleteTable(table2.getTableName()); 219 admin4.deleteNamespace(NS1); 220 admin4.deleteNamespace(NS2); 221 UTIL1.getAdmin().removeReplicationPeer(PEER4_NS); 222 UTIL1.getAdmin().removeReplicationPeer(PEER4_NS_TABLE); 223 } 224 225 @Test 226 @Override 227 public void testBulkLoadReplicationActiveActive() throws Exception { 228 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); 229 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); 230 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); 231 Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName); 232 Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE); 233 Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE); 234 235 // case1: The ns1 tables will be replicate to cluster4 236 byte[] row = Bytes.toBytes("002_ns_peer"); 237 byte[] value = Bytes.toBytes("v2"); 238 bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1); 239 waitForReplication(ns1Table, 1, NB_RETRIES); 240 assertTableHasValue(ns1Table, row, value); 241 242 // case2: The ns2:t2_syncup will be replicate to cluster4 243 // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog 244 row = Bytes.toBytes("003_ns_table_peer"); 245 value = Bytes.toBytes("v2"); 246 bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1); 247 waitForReplication(ns2Table, 1, NB_RETRIES); 248 assertTableHasValue(ns2Table, row, value); 249 250 // case3: The table test will be replicate to cluster1,cluster2,cluster3 251 // not replicate to cluster4, because we not set other peer for that tables. 252 row = Bytes.toBytes("001_nopeer"); 253 value = Bytes.toBytes("v1"); 254 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable, 255 peer3TestTable); 256 assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty 257 258 // Verify hfile-refs for 1:ns_peer1, expect is empty 259 MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster(); 260 ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null); 261 RecoverableZooKeeper zk = RecoverableZooKeeper.connect(UTIL1.getConfiguration(), watcher); 262 ReplicationQueueStorage replicationQueueStorage = ReplicationStorageFactory 263 .getReplicationQueueStorage(UTIL1.getConnection(), UTIL1.getConfiguration()); 264 Set<String> hfiles = replicationQueueStorage.getAllHFileRefs(); 265 assertTrue(hfiles.isEmpty()); 266 } 267}