001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.HashMap; 031import java.util.List; 032import java.util.Map; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 051import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 052import org.apache.hadoop.hbase.testclassification.FlakeyTests; 053import org.apache.hadoop.hbase.testclassification.LargeTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 056import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 057import org.junit.AfterClass; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Rule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.rules.TestName; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 068 069@Category({ FlakeyTests.class, LargeTests.class }) 070public class TestPerTableCFReplication { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestPerTableCFReplication.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class); 077 078 private static Configuration conf1; 079 private static Configuration conf2; 080 private static Configuration conf3; 081 082 private static HBaseTestingUtil utility1; 083 private static HBaseTestingUtil utility2; 084 private static HBaseTestingUtil utility3; 085 private static final long SLEEP_TIME = 500; 086 private static final int NB_RETRIES = 100; 087 088 private static final TableName tableName = TableName.valueOf("test"); 089 private static final TableName tabAName = TableName.valueOf("TA"); 090 private static final TableName tabBName = TableName.valueOf("TB"); 091 private static final TableName tabCName = TableName.valueOf("TC"); 092 private static final byte[] famName = Bytes.toBytes("f"); 093 private static final byte[] f1Name = Bytes.toBytes("f1"); 094 private static final byte[] f2Name = Bytes.toBytes("f2"); 095 private static final byte[] f3Name = Bytes.toBytes("f3"); 096 private static final byte[] row1 = Bytes.toBytes("row1"); 097 private static final byte[] row2 = Bytes.toBytes("row2"); 098 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 099 private static final byte[] val = Bytes.toBytes("myval"); 100 101 private static TableDescriptor table; 102 private static TableDescriptor tabA; 103 private static TableDescriptor tabB; 104 private static TableDescriptor tabC; 105 106 @Rule 107 public TestName name = new TestName(); 108 109 @BeforeClass 110 public static void setUpBeforeClass() throws Exception { 111 conf1 = HBaseConfiguration.create(); 112 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); 113 // smaller block size and capacity to trigger more operations 114 // and test them 115 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 116 conf1.setInt("replication.source.size.capacity", 1024); 117 conf1.setLong("replication.source.sleepforretries", 100); 118 conf1.setInt("hbase.regionserver.maxlogs", 10); 119 conf1.setLong("hbase.master.logcleaner.ttl", 10); 120 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 121 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 122 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); 123 124 utility1 = new HBaseTestingUtil(conf1); 125 utility1.startMiniZKCluster(); 126 MiniZooKeeperCluster miniZK = utility1.getZkCluster(); 127 new ZKWatcher(conf1, "cluster1", null, true); 128 129 conf2 = new Configuration(conf1); 130 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); 131 132 conf3 = new Configuration(conf1); 133 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); 134 135 utility2 = new HBaseTestingUtil(conf2); 136 utility2.setZkCluster(miniZK); 137 new ZKWatcher(conf2, "cluster3", null, true); 138 139 utility3 = new HBaseTestingUtil(conf3); 140 utility3.setZkCluster(miniZK); 141 new ZKWatcher(conf3, "cluster3", null, true); 142 143 table = TableDescriptorBuilder.newBuilder(tableName) 144 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 145 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 146 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 147 148 tabA = TableDescriptorBuilder.newBuilder(tabAName) 149 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 150 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 151 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 152 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 153 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name) 154 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 155 .build(); 156 157 tabB = TableDescriptorBuilder.newBuilder(tabBName) 158 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 159 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 160 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 161 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 162 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name) 163 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 164 .build(); 165 166 tabC = TableDescriptorBuilder.newBuilder(tabCName) 167 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name) 168 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 169 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name) 170 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 171 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name) 172 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 173 .build(); 174 175 utility1.startMiniCluster(); 176 utility2.startMiniCluster(); 177 utility3.startMiniCluster(); 178 } 179 180 @AfterClass 181 public static void tearDownAfterClass() throws Exception { 182 utility3.shutdownMiniCluster(); 183 utility2.shutdownMiniCluster(); 184 utility1.shutdownMiniCluster(); 185 } 186 187 @Test 188 public void testParseTableCFsFromConfig() { 189 Map<TableName, List<String>> tabCFsMap = null; 190 191 // 1. null or empty string, result should be null 192 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null); 193 assertEquals(null, tabCFsMap); 194 195 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(""); 196 assertEquals(null, tabCFsMap); 197 198 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" "); 199 assertEquals(null, tabCFsMap); 200 201 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 202 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 203 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 204 205 // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3" 206 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString()); 207 assertEquals(1, tabCFsMap.size()); // only one table 208 assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1" 209 assertFalse(tabCFsMap.containsKey(tableName2)); // not other table 210 assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list, 211 212 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1"); 213 assertEquals(1, tabCFsMap.size()); // only one table 214 assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2" 215 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table 216 assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf 217 assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1" 218 219 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3"); 220 assertEquals(1, tabCFsMap.size()); // only one table 221 assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2" 222 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table 223 assertEquals(2, tabCFsMap.get(tableName3).size()); // cf-list contains 2 cf 224 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1" 225 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3" 226 227 // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" 228 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 229 tableName1 + " ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,cf3"); 230 // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" 231 assertEquals(3, tabCFsMap.size()); 232 assertTrue(tabCFsMap.containsKey(tableName1)); 233 assertTrue(tabCFsMap.containsKey(tableName2)); 234 assertTrue(tabCFsMap.containsKey(tableName3)); 235 // 3.2 table "tab1" : null cf-list 236 assertEquals(null, tabCFsMap.get(tableName1)); 237 // 3.3 table "tab2" : cf-list contains a single cf "cf1" 238 assertEquals(1, tabCFsMap.get(tableName2).size()); 239 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 240 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" 241 assertEquals(2, tabCFsMap.get(tableName3).size()); 242 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 243 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 244 245 // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated 246 // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" 247 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 248 tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;"); 249 // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" 250 assertEquals(3, tabCFsMap.size()); 251 assertTrue(tabCFsMap.containsKey(tableName1)); 252 assertTrue(tabCFsMap.containsKey(tableName2)); 253 assertTrue(tabCFsMap.containsKey(tableName3)); 254 // 4.2 table "tab1" : null cf-list 255 assertEquals(null, tabCFsMap.get(tableName1)); 256 // 4.3 table "tab2" : cf-list contains a single cf "cf1" 257 assertEquals(1, tabCFsMap.get(tableName2).size()); 258 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 259 // 4.4 table "tab3" : cf-list contains "cf1" and "cf3" 260 assertEquals(2, tabCFsMap.get(tableName3).size()); 261 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 262 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 263 264 // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3" 265 // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally 266 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( 267 tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3"); 268 // 5.1 no "tableName1" and "tableName2", only "tableName3" 269 assertEquals(1, tabCFsMap.size()); // only one table 270 assertFalse(tabCFsMap.containsKey(tableName1)); 271 assertFalse(tabCFsMap.containsKey(tableName2)); 272 assertTrue(tabCFsMap.containsKey(tableName3)); 273 // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3" 274 assertEquals(2, tabCFsMap.get(tableName3).size()); 275 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 276 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 277 } 278 279 @Test 280 public void testTableCFsHelperConverter() { 281 282 ReplicationProtos.TableCF[] tableCFs = null; 283 Map<TableName, List<String>> tabCFsMap = null; 284 285 // 1. null or empty string, result should be null 286 assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap)); 287 288 tabCFsMap = new HashMap<>(); 289 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 290 assertEquals(0, tableCFs.length); 291 292 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 293 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 294 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 295 296 // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" 297 tabCFsMap.clear(); 298 tabCFsMap.put(tableName1, null); 299 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 300 assertEquals(1, tableCFs.length); // only one table 301 assertEquals(tableName1.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 302 assertEquals(0, tableCFs[0].getFamiliesCount()); 303 304 tabCFsMap.clear(); 305 tabCFsMap.put(tableName2, new ArrayList<>()); 306 tabCFsMap.get(tableName2).add("cf1"); 307 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 308 assertEquals(1, tableCFs.length); // only one table 309 assertEquals(tableName2.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 310 assertEquals(1, tableCFs[0].getFamiliesCount()); 311 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); 312 313 tabCFsMap.clear(); 314 tabCFsMap.put(tableName3, new ArrayList<>()); 315 tabCFsMap.get(tableName3).add("cf1"); 316 tabCFsMap.get(tableName3).add("cf3"); 317 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 318 assertEquals(1, tableCFs.length); 319 assertEquals(tableName3.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); 320 assertEquals(2, tableCFs[0].getFamiliesCount()); 321 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8()); 322 assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8()); 323 324 tabCFsMap.clear(); 325 tabCFsMap.put(tableName1, null); 326 tabCFsMap.put(tableName2, new ArrayList<>()); 327 tabCFsMap.get(tableName2).add("cf1"); 328 tabCFsMap.put(tableName3, new ArrayList<>()); 329 tabCFsMap.get(tableName3).add("cf1"); 330 tabCFsMap.get(tableName3).add("cf3"); 331 332 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); 333 assertEquals(3, tableCFs.length); 334 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString())); 335 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())); 336 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())); 337 338 assertEquals(0, 339 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount()); 340 341 assertEquals(1, 342 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()).getFamiliesCount()); 343 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()) 344 .getFamilies(0).toStringUtf8()); 345 346 assertEquals(2, 347 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()).getFamiliesCount()); 348 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 349 .getFamilies(0).toStringUtf8()); 350 assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) 351 .getFamilies(1).toStringUtf8()); 352 353 tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs); 354 assertEquals(3, tabCFsMap.size()); 355 assertTrue(tabCFsMap.containsKey(tableName1)); 356 assertTrue(tabCFsMap.containsKey(tableName2)); 357 assertTrue(tabCFsMap.containsKey(tableName3)); 358 // 3.2 table "tab1" : null cf-list 359 assertEquals(null, tabCFsMap.get(tableName1)); 360 // 3.3 table "tab2" : cf-list contains a single cf "cf1" 361 assertEquals(1, tabCFsMap.get(tableName2).size()); 362 assertEquals("cf1", tabCFsMap.get(tableName2).get(0)); 363 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" 364 assertEquals(2, tabCFsMap.get(tableName3).size()); 365 assertTrue(tabCFsMap.get(tableName3).contains("cf1")); 366 assertTrue(tabCFsMap.get(tableName3).contains("cf3")); 367 } 368 369 @Test 370 public void testPerTableCFReplication() throws Exception { 371 LOG.info("testPerTableCFReplication"); 372 try (Connection connection1 = ConnectionFactory.createConnection(conf1); 373 Connection connection2 = ConnectionFactory.createConnection(conf2); 374 Connection connection3 = ConnectionFactory.createConnection(conf3); 375 Admin admin1 = connection1.getAdmin(); Admin admin2 = connection2.getAdmin(); 376 Admin admin3 = connection3.getAdmin(); Admin replicationAdmin = connection1.getAdmin()) { 377 378 admin1.createTable(tabA); 379 admin1.createTable(tabB); 380 admin1.createTable(tabC); 381 admin2.createTable(tabA); 382 admin2.createTable(tabB); 383 admin2.createTable(tabC); 384 admin3.createTable(tabA); 385 admin3.createTable(tabB); 386 admin3.createTable(tabC); 387 388 Table htab1A = connection1.getTable(tabAName); 389 Table htab2A = connection2.getTable(tabAName); 390 Table htab3A = connection3.getTable(tabAName); 391 392 Table htab1B = connection1.getTable(tabBName); 393 Table htab2B = connection2.getTable(tabBName); 394 Table htab3B = connection3.getTable(tabBName); 395 396 Table htab1C = connection1.getTable(tabCName); 397 Table htab2C = connection2.getTable(tabCName); 398 Table htab3C = connection3.getTable(tabCName); 399 400 // A. add cluster2/cluster3 as peers to cluster1 401 Map<TableName, List<String>> tableCFs = new HashMap<>(); 402 tableCFs.put(tabCName, null); 403 tableCFs.put(tabBName, new ArrayList<>()); 404 tableCFs.get(tabBName).add("f1"); 405 tableCFs.get(tabBName).add("f3"); 406 ReplicationPeerConfig rpc2 = 407 ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI()) 408 .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build(); 409 replicationAdmin.addReplicationPeer("2", rpc2); 410 411 tableCFs.clear(); 412 tableCFs.put(tabAName, null); 413 tableCFs.put(tabBName, new ArrayList<>()); 414 tableCFs.get(tabBName).add("f1"); 415 tableCFs.get(tabBName).add("f2"); 416 ReplicationPeerConfig rpc3 = 417 ReplicationPeerConfig.newBuilder().setClusterKey(utility3.getRpcConnnectionURI()) 418 .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build(); 419 replicationAdmin.addReplicationPeer("3", rpc3); 420 421 // A1. tableA can only replicated to cluster3 422 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A); 423 ensureRowNotReplicated(row1, f1Name, htab2A); 424 deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A); 425 426 putAndWaitWithFamily(row1, f2Name, htab1A, htab3A); 427 ensureRowNotReplicated(row1, f2Name, htab2A); 428 deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A); 429 430 putAndWaitWithFamily(row1, f3Name, htab1A, htab3A); 431 ensureRowNotReplicated(row1, f3Name, htab2A); 432 deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A); 433 434 // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3 435 putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); 436 deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B); 437 438 // cf 'f2' of tableB can only replicated to cluster3 439 putAndWaitWithFamily(row1, f2Name, htab1B, htab3B); 440 ensureRowNotReplicated(row1, f2Name, htab2B); 441 deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B); 442 443 // cf 'f3' of tableB can only replicated to cluster2 444 putAndWaitWithFamily(row1, f3Name, htab1B, htab2B); 445 ensureRowNotReplicated(row1, f3Name, htab3B); 446 deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B); 447 448 // A3. tableC can only replicated to cluster2 449 putAndWaitWithFamily(row1, f1Name, htab1C, htab2C); 450 ensureRowNotReplicated(row1, f1Name, htab3C); 451 deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C); 452 453 putAndWaitWithFamily(row1, f2Name, htab1C, htab2C); 454 ensureRowNotReplicated(row1, f2Name, htab3C); 455 deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C); 456 457 putAndWaitWithFamily(row1, f3Name, htab1C, htab2C); 458 ensureRowNotReplicated(row1, f3Name, htab3C); 459 deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C); 460 461 // B. change peers' replicable table-cf config 462 tableCFs.clear(); 463 tableCFs.put(tabAName, new ArrayList<>()); 464 tableCFs.get(tabAName).add("f1"); 465 tableCFs.get(tabAName).add("f2"); 466 tableCFs.put(tabCName, new ArrayList<>()); 467 tableCFs.get(tabCName).add("f2"); 468 tableCFs.get(tabCName).add("f3"); 469 replicationAdmin.updateReplicationPeerConfig("2", 470 ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("2")) 471 .setTableCFsMap(tableCFs).build()); 472 473 tableCFs.clear(); 474 tableCFs.put(tabBName, null); 475 tableCFs.put(tabCName, new ArrayList<>()); 476 tableCFs.get(tabCName).add("f3"); 477 replicationAdmin.updateReplicationPeerConfig("3", 478 ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("3")) 479 .setTableCFsMap(tableCFs).build()); 480 481 // B1. cf 'f1' of tableA can only replicated to cluster2 482 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A); 483 ensureRowNotReplicated(row2, f1Name, htab3A); 484 deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A); 485 // cf 'f2' of tableA can only replicated to cluster2 486 putAndWaitWithFamily(row2, f2Name, htab1A, htab2A); 487 ensureRowNotReplicated(row2, f2Name, htab3A); 488 deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A); 489 // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3 490 putAndWaitWithFamily(row2, f3Name, htab1A); 491 ensureRowNotReplicated(row2, f3Name, htab2A, htab3A); 492 deleteAndWaitWithFamily(row2, f3Name, htab1A); 493 494 // B2. tableB can only replicated to cluster3 495 putAndWaitWithFamily(row2, f1Name, htab1B, htab3B); 496 ensureRowNotReplicated(row2, f1Name, htab2B); 497 deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B); 498 499 putAndWaitWithFamily(row2, f2Name, htab1B, htab3B); 500 ensureRowNotReplicated(row2, f2Name, htab2B); 501 deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B); 502 503 putAndWaitWithFamily(row2, f3Name, htab1B, htab3B); 504 ensureRowNotReplicated(row2, f3Name, htab2B); 505 deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B); 506 507 // B3. cf 'f1' of tableC non-replicable to either cluster 508 putAndWaitWithFamily(row2, f1Name, htab1C); 509 ensureRowNotReplicated(row2, f1Name, htab2C, htab3C); 510 deleteAndWaitWithFamily(row2, f1Name, htab1C); 511 // cf 'f2' of tableC can only replicated to cluster2 512 putAndWaitWithFamily(row2, f2Name, htab1C, htab2C); 513 ensureRowNotReplicated(row2, f2Name, htab3C); 514 deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C); 515 // cf 'f3' of tableC can replicated to cluster2 and cluster3 516 putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); 517 deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); 518 } 519 } 520 521 private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException { 522 Get get = new Get(row); 523 get.addFamily(fam); 524 for (Table table : tables) { 525 Result res = table.get(get); 526 assertEquals(0, res.size()); 527 } 528 } 529 530 private void deleteAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets) 531 throws Exception { 532 Delete del = new Delete(row); 533 del.addFamily(fam); 534 source.delete(del); 535 536 Get get = new Get(row); 537 get.addFamily(fam); 538 for (int i = 0; i < NB_RETRIES; i++) { 539 if (i == NB_RETRIES - 1) { 540 fail("Waited too much time for del replication"); 541 } 542 boolean removedFromAll = true; 543 for (Table target : targets) { 544 Result res = target.get(get); 545 if (res.size() >= 1) { 546 LOG.info("Row not deleted"); 547 removedFromAll = false; 548 break; 549 } 550 } 551 if (removedFromAll) { 552 break; 553 } else { 554 Thread.sleep(SLEEP_TIME); 555 } 556 } 557 } 558 559 private void putAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets) 560 throws Exception { 561 Put put = new Put(row); 562 put.addColumn(fam, row, val); 563 source.put(put); 564 565 Get get = new Get(row); 566 get.addFamily(fam); 567 for (int i = 0; i < NB_RETRIES; i++) { 568 if (i == NB_RETRIES - 1) { 569 fail("Waited too much time for put replication"); 570 } 571 boolean replicatedToAll = true; 572 for (Table target : targets) { 573 Result res = target.get(get); 574 if (res.isEmpty()) { 575 LOG.info("Row not available"); 576 replicatedToAll = false; 577 break; 578 } else { 579 assertEquals(1, res.size()); 580 assertArrayEquals(val, res.value()); 581 } 582 } 583 if (replicatedToAll) { 584 break; 585 } else { 586 Thread.sleep(SLEEP_TIME); 587 } 588 } 589 } 590}