001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
051import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
052import org.apache.hadoop.hbase.testclassification.FlakeyTests;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
056import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
057import org.junit.AfterClass;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
068
069@Category({ FlakeyTests.class, LargeTests.class })
070public class TestPerTableCFReplication {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestPerTableCFReplication.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class);
077
078  private static Configuration conf1;
079  private static Configuration conf2;
080  private static Configuration conf3;
081
082  private static HBaseTestingUtil utility1;
083  private static HBaseTestingUtil utility2;
084  private static HBaseTestingUtil utility3;
085  private static final long SLEEP_TIME = 500;
086  private static final int NB_RETRIES = 100;
087
088  private static final TableName tableName = TableName.valueOf("test");
089  private static final TableName tabAName = TableName.valueOf("TA");
090  private static final TableName tabBName = TableName.valueOf("TB");
091  private static final TableName tabCName = TableName.valueOf("TC");
092  private static final byte[] famName = Bytes.toBytes("f");
093  private static final byte[] f1Name = Bytes.toBytes("f1");
094  private static final byte[] f2Name = Bytes.toBytes("f2");
095  private static final byte[] f3Name = Bytes.toBytes("f3");
096  private static final byte[] row1 = Bytes.toBytes("row1");
097  private static final byte[] row2 = Bytes.toBytes("row2");
098  private static final byte[] noRepfamName = Bytes.toBytes("norep");
099  private static final byte[] val = Bytes.toBytes("myval");
100
101  private static TableDescriptor table;
102  private static TableDescriptor tabA;
103  private static TableDescriptor tabB;
104  private static TableDescriptor tabC;
105
106  @Rule
107  public TestName name = new TestName();
108
109  @BeforeClass
110  public static void setUpBeforeClass() throws Exception {
111    conf1 = HBaseConfiguration.create();
112    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
113    // smaller block size and capacity to trigger more operations
114    // and test them
115    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
116    conf1.setInt("replication.source.size.capacity", 1024);
117    conf1.setLong("replication.source.sleepforretries", 100);
118    conf1.setInt("hbase.regionserver.maxlogs", 10);
119    conf1.setLong("hbase.master.logcleaner.ttl", 10);
120    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
121    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
122      "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
123
124    utility1 = new HBaseTestingUtil(conf1);
125    utility1.startMiniZKCluster();
126    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
127    new ZKWatcher(conf1, "cluster1", null, true);
128
129    conf2 = new Configuration(conf1);
130    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
131
132    conf3 = new Configuration(conf1);
133    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
134
135    utility2 = new HBaseTestingUtil(conf2);
136    utility2.setZkCluster(miniZK);
137    new ZKWatcher(conf2, "cluster3", null, true);
138
139    utility3 = new HBaseTestingUtil(conf3);
140    utility3.setZkCluster(miniZK);
141    new ZKWatcher(conf3, "cluster3", null, true);
142
143    table = TableDescriptorBuilder.newBuilder(tableName)
144      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
145        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
146      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
147
148    tabA = TableDescriptorBuilder.newBuilder(tabAName)
149      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
150        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
151      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
152        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
153      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
154        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
155      .build();
156
157    tabB = TableDescriptorBuilder.newBuilder(tabBName)
158      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
159        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
160      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
161        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
162      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
163        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
164      .build();
165
166    tabC = TableDescriptorBuilder.newBuilder(tabCName)
167      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
168        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
169      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
170        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
171      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
172        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
173      .build();
174
175    utility1.startMiniCluster();
176    utility2.startMiniCluster();
177    utility3.startMiniCluster();
178  }
179
180  @AfterClass
181  public static void tearDownAfterClass() throws Exception {
182    utility3.shutdownMiniCluster();
183    utility2.shutdownMiniCluster();
184    utility1.shutdownMiniCluster();
185  }
186
187  @Test
188  public void testParseTableCFsFromConfig() {
189    Map<TableName, List<String>> tabCFsMap = null;
190
191    // 1. null or empty string, result should be null
192    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null);
193    assertEquals(null, tabCFsMap);
194
195    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("");
196    assertEquals(null, tabCFsMap);
197
198    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("   ");
199    assertEquals(null, tabCFsMap);
200
201    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
202    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
203    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
204
205    // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
206    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString());
207    assertEquals(1, tabCFsMap.size()); // only one table
208    assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1"
209    assertFalse(tabCFsMap.containsKey(tableName2)); // not other table
210    assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list,
211
212    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1");
213    assertEquals(1, tabCFsMap.size()); // only one table
214    assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2"
215    assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
216    assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf
217    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1"
218
219    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
220    assertEquals(1, tabCFsMap.size()); // only one table
221    assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2"
222    assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
223    assertEquals(2, tabCFsMap.get(tableName3).size()); // cf-list contains 2 cf
224    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1"
225    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
226
227    // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
228    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
229      tableName1 + " ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,cf3");
230    // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
231    assertEquals(3, tabCFsMap.size());
232    assertTrue(tabCFsMap.containsKey(tableName1));
233    assertTrue(tabCFsMap.containsKey(tableName2));
234    assertTrue(tabCFsMap.containsKey(tableName3));
235    // 3.2 table "tab1" : null cf-list
236    assertEquals(null, tabCFsMap.get(tableName1));
237    // 3.3 table "tab2" : cf-list contains a single cf "cf1"
238    assertEquals(1, tabCFsMap.get(tableName2).size());
239    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
240    // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
241    assertEquals(2, tabCFsMap.get(tableName3).size());
242    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
243    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
244
245    // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
246    // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
247    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
248      tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;");
249    // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
250    assertEquals(3, tabCFsMap.size());
251    assertTrue(tabCFsMap.containsKey(tableName1));
252    assertTrue(tabCFsMap.containsKey(tableName2));
253    assertTrue(tabCFsMap.containsKey(tableName3));
254    // 4.2 table "tab1" : null cf-list
255    assertEquals(null, tabCFsMap.get(tableName1));
256    // 4.3 table "tab2" : cf-list contains a single cf "cf1"
257    assertEquals(1, tabCFsMap.get(tableName2).size());
258    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
259    // 4.4 table "tab3" : cf-list contains "cf1" and "cf3"
260    assertEquals(2, tabCFsMap.get(tableName3).size());
261    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
262    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
263
264    // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
265    // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
266    tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
267      tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3");
268    // 5.1 no "tableName1" and "tableName2", only "tableName3"
269    assertEquals(1, tabCFsMap.size()); // only one table
270    assertFalse(tabCFsMap.containsKey(tableName1));
271    assertFalse(tabCFsMap.containsKey(tableName2));
272    assertTrue(tabCFsMap.containsKey(tableName3));
273    // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3"
274    assertEquals(2, tabCFsMap.get(tableName3).size());
275    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
276    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
277  }
278
279  @Test
280  public void testTableCFsHelperConverter() {
281
282    ReplicationProtos.TableCF[] tableCFs = null;
283    Map<TableName, List<String>> tabCFsMap = null;
284
285    // 1. null or empty string, result should be null
286    assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap));
287
288    tabCFsMap = new HashMap<>();
289    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
290    assertEquals(0, tableCFs.length);
291
292    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
293    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
294    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
295
296    // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
297    tabCFsMap.clear();
298    tabCFsMap.put(tableName1, null);
299    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
300    assertEquals(1, tableCFs.length); // only one table
301    assertEquals(tableName1.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8());
302    assertEquals(0, tableCFs[0].getFamiliesCount());
303
304    tabCFsMap.clear();
305    tabCFsMap.put(tableName2, new ArrayList<>());
306    tabCFsMap.get(tableName2).add("cf1");
307    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
308    assertEquals(1, tableCFs.length); // only one table
309    assertEquals(tableName2.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8());
310    assertEquals(1, tableCFs[0].getFamiliesCount());
311    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
312
313    tabCFsMap.clear();
314    tabCFsMap.put(tableName3, new ArrayList<>());
315    tabCFsMap.get(tableName3).add("cf1");
316    tabCFsMap.get(tableName3).add("cf3");
317    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
318    assertEquals(1, tableCFs.length);
319    assertEquals(tableName3.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8());
320    assertEquals(2, tableCFs[0].getFamiliesCount());
321    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
322    assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8());
323
324    tabCFsMap.clear();
325    tabCFsMap.put(tableName1, null);
326    tabCFsMap.put(tableName2, new ArrayList<>());
327    tabCFsMap.get(tableName2).add("cf1");
328    tabCFsMap.put(tableName3, new ArrayList<>());
329    tabCFsMap.get(tableName3).add("cf1");
330    tabCFsMap.get(tableName3).add("cf3");
331
332    tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
333    assertEquals(3, tableCFs.length);
334    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()));
335    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()));
336    assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()));
337
338    assertEquals(0,
339      ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
340
341    assertEquals(1,
342      ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()).getFamiliesCount());
343    assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
344      .getFamilies(0).toStringUtf8());
345
346    assertEquals(2,
347      ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()).getFamiliesCount());
348    assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
349      .getFamilies(0).toStringUtf8());
350    assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
351      .getFamilies(1).toStringUtf8());
352
353    tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs);
354    assertEquals(3, tabCFsMap.size());
355    assertTrue(tabCFsMap.containsKey(tableName1));
356    assertTrue(tabCFsMap.containsKey(tableName2));
357    assertTrue(tabCFsMap.containsKey(tableName3));
358    // 3.2 table "tab1" : null cf-list
359    assertEquals(null, tabCFsMap.get(tableName1));
360    // 3.3 table "tab2" : cf-list contains a single cf "cf1"
361    assertEquals(1, tabCFsMap.get(tableName2).size());
362    assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
363    // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
364    assertEquals(2, tabCFsMap.get(tableName3).size());
365    assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
366    assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
367  }
368
369  @Test
370  public void testPerTableCFReplication() throws Exception {
371    LOG.info("testPerTableCFReplication");
372    try (Connection connection1 = ConnectionFactory.createConnection(conf1);
373      Connection connection2 = ConnectionFactory.createConnection(conf2);
374      Connection connection3 = ConnectionFactory.createConnection(conf3);
375      Admin admin1 = connection1.getAdmin(); Admin admin2 = connection2.getAdmin();
376      Admin admin3 = connection3.getAdmin(); Admin replicationAdmin = connection1.getAdmin()) {
377
378      admin1.createTable(tabA);
379      admin1.createTable(tabB);
380      admin1.createTable(tabC);
381      admin2.createTable(tabA);
382      admin2.createTable(tabB);
383      admin2.createTable(tabC);
384      admin3.createTable(tabA);
385      admin3.createTable(tabB);
386      admin3.createTable(tabC);
387
388      Table htab1A = connection1.getTable(tabAName);
389      Table htab2A = connection2.getTable(tabAName);
390      Table htab3A = connection3.getTable(tabAName);
391
392      Table htab1B = connection1.getTable(tabBName);
393      Table htab2B = connection2.getTable(tabBName);
394      Table htab3B = connection3.getTable(tabBName);
395
396      Table htab1C = connection1.getTable(tabCName);
397      Table htab2C = connection2.getTable(tabCName);
398      Table htab3C = connection3.getTable(tabCName);
399
400      // A. add cluster2/cluster3 as peers to cluster1
401      Map<TableName, List<String>> tableCFs = new HashMap<>();
402      tableCFs.put(tabCName, null);
403      tableCFs.put(tabBName, new ArrayList<>());
404      tableCFs.get(tabBName).add("f1");
405      tableCFs.get(tabBName).add("f3");
406      ReplicationPeerConfig rpc2 =
407        ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getRpcConnnectionURI())
408          .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build();
409      replicationAdmin.addReplicationPeer("2", rpc2);
410
411      tableCFs.clear();
412      tableCFs.put(tabAName, null);
413      tableCFs.put(tabBName, new ArrayList<>());
414      tableCFs.get(tabBName).add("f1");
415      tableCFs.get(tabBName).add("f2");
416      ReplicationPeerConfig rpc3 =
417        ReplicationPeerConfig.newBuilder().setClusterKey(utility3.getRpcConnnectionURI())
418          .setReplicateAllUserTables(false).setTableCFsMap(tableCFs).build();
419      replicationAdmin.addReplicationPeer("3", rpc3);
420
421      // A1. tableA can only replicated to cluster3
422      putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
423      ensureRowNotReplicated(row1, f1Name, htab2A);
424      deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
425
426      putAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
427      ensureRowNotReplicated(row1, f2Name, htab2A);
428      deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
429
430      putAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
431      ensureRowNotReplicated(row1, f3Name, htab2A);
432      deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
433
434      // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3
435      putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
436      deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
437
438      // cf 'f2' of tableB can only replicated to cluster3
439      putAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
440      ensureRowNotReplicated(row1, f2Name, htab2B);
441      deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
442
443      // cf 'f3' of tableB can only replicated to cluster2
444      putAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
445      ensureRowNotReplicated(row1, f3Name, htab3B);
446      deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
447
448      // A3. tableC can only replicated to cluster2
449      putAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
450      ensureRowNotReplicated(row1, f1Name, htab3C);
451      deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
452
453      putAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
454      ensureRowNotReplicated(row1, f2Name, htab3C);
455      deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
456
457      putAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
458      ensureRowNotReplicated(row1, f3Name, htab3C);
459      deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
460
461      // B. change peers' replicable table-cf config
462      tableCFs.clear();
463      tableCFs.put(tabAName, new ArrayList<>());
464      tableCFs.get(tabAName).add("f1");
465      tableCFs.get(tabAName).add("f2");
466      tableCFs.put(tabCName, new ArrayList<>());
467      tableCFs.get(tabCName).add("f2");
468      tableCFs.get(tabCName).add("f3");
469      replicationAdmin.updateReplicationPeerConfig("2",
470        ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("2"))
471          .setTableCFsMap(tableCFs).build());
472
473      tableCFs.clear();
474      tableCFs.put(tabBName, null);
475      tableCFs.put(tabCName, new ArrayList<>());
476      tableCFs.get(tabCName).add("f3");
477      replicationAdmin.updateReplicationPeerConfig("3",
478        ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("3"))
479          .setTableCFsMap(tableCFs).build());
480
481      // B1. cf 'f1' of tableA can only replicated to cluster2
482      putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
483      ensureRowNotReplicated(row2, f1Name, htab3A);
484      deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
485      // cf 'f2' of tableA can only replicated to cluster2
486      putAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
487      ensureRowNotReplicated(row2, f2Name, htab3A);
488      deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
489      // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3
490      putAndWaitWithFamily(row2, f3Name, htab1A);
491      ensureRowNotReplicated(row2, f3Name, htab2A, htab3A);
492      deleteAndWaitWithFamily(row2, f3Name, htab1A);
493
494      // B2. tableB can only replicated to cluster3
495      putAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
496      ensureRowNotReplicated(row2, f1Name, htab2B);
497      deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
498
499      putAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
500      ensureRowNotReplicated(row2, f2Name, htab2B);
501      deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
502
503      putAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
504      ensureRowNotReplicated(row2, f3Name, htab2B);
505      deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
506
507      // B3. cf 'f1' of tableC non-replicable to either cluster
508      putAndWaitWithFamily(row2, f1Name, htab1C);
509      ensureRowNotReplicated(row2, f1Name, htab2C, htab3C);
510      deleteAndWaitWithFamily(row2, f1Name, htab1C);
511      // cf 'f2' of tableC can only replicated to cluster2
512      putAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
513      ensureRowNotReplicated(row2, f2Name, htab3C);
514      deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
515      // cf 'f3' of tableC can replicated to cluster2 and cluster3
516      putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
517      deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
518    }
519  }
520
521  private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
522    Get get = new Get(row);
523    get.addFamily(fam);
524    for (Table table : tables) {
525      Result res = table.get(get);
526      assertEquals(0, res.size());
527    }
528  }
529
530  private void deleteAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets)
531    throws Exception {
532    Delete del = new Delete(row);
533    del.addFamily(fam);
534    source.delete(del);
535
536    Get get = new Get(row);
537    get.addFamily(fam);
538    for (int i = 0; i < NB_RETRIES; i++) {
539      if (i == NB_RETRIES - 1) {
540        fail("Waited too much time for del replication");
541      }
542      boolean removedFromAll = true;
543      for (Table target : targets) {
544        Result res = target.get(get);
545        if (res.size() >= 1) {
546          LOG.info("Row not deleted");
547          removedFromAll = false;
548          break;
549        }
550      }
551      if (removedFromAll) {
552        break;
553      } else {
554        Thread.sleep(SLEEP_TIME);
555      }
556    }
557  }
558
559  private void putAndWaitWithFamily(byte[] row, byte[] fam, Table source, Table... targets)
560    throws Exception {
561    Put put = new Put(row);
562    put.addColumn(fam, row, val);
563    source.put(put);
564
565    Get get = new Get(row);
566    get.addFamily(fam);
567    for (int i = 0; i < NB_RETRIES; i++) {
568      if (i == NB_RETRIES - 1) {
569        fail("Waited too much time for put replication");
570      }
571      boolean replicatedToAll = true;
572      for (Table target : targets) {
573        Result res = target.get(get);
574        if (res.isEmpty()) {
575          LOG.info("Row not available");
576          replicatedToAll = false;
577          break;
578        } else {
579          assertEquals(1, res.size());
580          assertArrayEquals(val, res.value());
581        }
582      }
583      if (replicatedToAll) {
584        break;
585      } else {
586        Thread.sleep(SLEEP_TIME);
587      }
588    }
589  }
590}