001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.fail; 024 025import java.io.Closeable; 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.ThreadLocalRandom; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Admin; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.ConnectionFactory; 048import org.apache.hadoop.hbase.client.Delete; 049import org.apache.hadoop.hbase.client.Durability; 050import org.apache.hadoop.hbase.client.Get; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptor; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 058import org.apache.hadoop.hbase.coprocessor.ObserverContext; 059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 061import org.apache.hadoop.hbase.coprocessor.RegionObserver; 062import org.apache.hadoop.hbase.regionserver.HRegion; 063import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 064import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; 065import org.apache.hadoop.hbase.testclassification.LargeTests; 066import org.apache.hadoop.hbase.testclassification.ReplicationTests; 067import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.HFileTestUtil; 070import org.apache.hadoop.hbase.wal.WALEdit; 071import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; 072import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 073import org.junit.After; 074import org.junit.Assert; 075import org.junit.Before; 076import org.junit.ClassRule; 077import org.junit.Test; 078import org.junit.experimental.categories.Category; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082@Category({ ReplicationTests.class, LargeTests.class }) 083public class TestMasterReplication { 084 085 @ClassRule 086 public static final HBaseClassTestRule CLASS_RULE = 087 HBaseClassTestRule.forClass(TestMasterReplication.class); 088 089 private static final Logger LOG = LoggerFactory.getLogger(TestMasterReplication.class); 090 091 private Configuration baseConfiguration; 092 093 private HBaseTestingUtil[] utilities; 094 private Configuration[] configurations; 095 private MiniZooKeeperCluster miniZK; 096 097 private static final long SLEEP_TIME = 1000; 098 private static final int NB_RETRIES = 120; 099 100 private static final TableName tableName = TableName.valueOf("test"); 101 private static final byte[] famName = Bytes.toBytes("f"); 102 private static final byte[] famName1 = Bytes.toBytes("f1"); 103 private static final byte[] row = Bytes.toBytes("row"); 104 private static final byte[] row1 = Bytes.toBytes("row1"); 105 private static final byte[] row2 = Bytes.toBytes("row2"); 106 private static final byte[] row3 = Bytes.toBytes("row3"); 107 private static final byte[] row4 = Bytes.toBytes("row4"); 108 private static final byte[] noRepfamName = Bytes.toBytes("norep"); 109 110 private static final byte[] count = Bytes.toBytes("count"); 111 private static final byte[] put = Bytes.toBytes("put"); 112 private static final byte[] delete = Bytes.toBytes("delete"); 113 114 private TableDescriptor table; 115 116 @Before 117 public void setUp() throws Exception { 118 baseConfiguration = HBaseConfiguration.create(); 119 // smaller block size and capacity to trigger more operations 120 // and test them 121 baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); 122 baseConfiguration.setInt("replication.source.size.capacity", 1024); 123 baseConfiguration.setLong("replication.source.sleepforretries", 100); 124 baseConfiguration.setInt("hbase.regionserver.maxlogs", 10); 125 baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10); 126 baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 127 baseConfiguration.set("hbase.replication.source.fs.conf.provider", 128 TestSourceFSConfigurationProvider.class.getCanonicalName()); 129 baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); 130 baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); 131 baseConfiguration.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, 132 CoprocessorCounter.class.getName()); 133 table = TableDescriptorBuilder.newBuilder(tableName) 134 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) 135 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 136 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName1) 137 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) 138 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); 139 } 140 141 /** 142 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by adding and deleting a 143 * row to a table in each cluster, checking if it's replicated. It also tests that the puts and 144 * deletes are not replicated back to the originating cluster. 145 */ 146 @Test 147 public void testCyclicReplication1() throws Exception { 148 LOG.info("testSimplePutDelete"); 149 int numClusters = 2; 150 Table[] htables = null; 151 try { 152 htables = setUpClusterTablesAndPeers(numClusters); 153 154 int[] expectedCounts = new int[] { 2, 2 }; 155 156 // add rows to both clusters, 157 // make sure they are both replication 158 putAndWait(row, famName, htables[0], htables[1]); 159 putAndWait(row1, famName, htables[1], htables[0]); 160 validateCounts(htables, put, expectedCounts); 161 162 deleteAndWait(row, htables[0], htables[1]); 163 deleteAndWait(row1, htables[1], htables[0]); 164 validateCounts(htables, delete, expectedCounts); 165 } finally { 166 close(htables); 167 shutDownMiniClusters(); 168 } 169 } 170 171 /** 172 * Tests the replication scenario 0 -> 0. By default 173 * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint}, 174 * the replication peer should not be added. 175 */ 176 @Test(expected = DoNotRetryIOException.class) 177 public void testLoopedReplication() throws Exception { 178 LOG.info("testLoopedReplication"); 179 startMiniClusters(1); 180 createTableOnClusters(table); 181 addPeer("1", 0, 0); 182 } 183 184 /** 185 * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of 186 * HFiles to a table in each cluster, checking if it's replicated. 187 */ 188 @Test 189 public void testHFileCyclicReplication() throws Exception { 190 LOG.info("testHFileCyclicReplication"); 191 int numClusters = 2; 192 Table[] htables = null; 193 try { 194 htables = setUpClusterTablesAndPeers(numClusters); 195 196 // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated 197 // to cluster '1'. 198 byte[][][] hfileRanges = 199 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 200 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; 201 int numOfRows = 100; 202 int[] expectedCounts = 203 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 204 205 loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row, 206 famName, htables, hfileRanges, numOfRows, expectedCounts, true); 207 208 // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated 209 // to cluster '0'. 210 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, 211 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; 212 numOfRows = 200; 213 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], 214 hfileRanges.length * numOfRows + expectedCounts[1] }; 215 216 loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row, 217 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); 218 219 } finally { 220 close(htables); 221 shutDownMiniClusters(); 222 } 223 } 224 225 private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception { 226 Table[] htables; 227 startMiniClusters(numClusters); 228 createTableOnClusters(table); 229 230 htables = getHTablesOnClusters(tableName); 231 // Test the replication scenarios of 0 -> 1 -> 0 232 addPeer("1", 0, 1); 233 addPeer("1", 1, 0); 234 return htables; 235 } 236 237 /** 238 * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a 239 * table in each clusters and ensuring that the each of these clusters get the appropriate 240 * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits 241 * originating from itself and also the edits that it received using replication from a different 242 * cluster. The scenario is explained in HBASE-9158 243 */ 244 @Test 245 public void testCyclicReplication2() throws Exception { 246 LOG.info("testCyclicReplication2"); 247 int numClusters = 3; 248 Table[] htables = null; 249 try { 250 startMiniClusters(numClusters); 251 createTableOnClusters(table); 252 253 // Test the replication scenario of 0 -> 1 -> 2 -> 0 254 addPeer("1", 0, 1); 255 addPeer("1", 1, 2); 256 addPeer("1", 2, 0); 257 258 htables = getHTablesOnClusters(tableName); 259 260 // put "row" and wait 'til it got around 261 putAndWait(row, famName, htables[0], htables[2]); 262 putAndWait(row1, famName, htables[1], htables[0]); 263 putAndWait(row2, famName, htables[2], htables[1]); 264 265 deleteAndWait(row, htables[0], htables[2]); 266 deleteAndWait(row1, htables[1], htables[0]); 267 deleteAndWait(row2, htables[2], htables[1]); 268 269 int[] expectedCounts = new int[] { 3, 3, 3 }; 270 validateCounts(htables, put, expectedCounts); 271 validateCounts(htables, delete, expectedCounts); 272 273 // Test HBASE-9158 274 disablePeer("1", 2); 275 // we now have an edit that was replicated into cluster originating from 276 // cluster 0 277 putAndWait(row3, famName, htables[0], htables[1]); 278 // now add a local edit to cluster 1 279 htables[1].put(new Put(row4).addColumn(famName, row4, row4)); 280 // re-enable replication from cluster 2 to cluster 0 281 enablePeer("1", 2); 282 // without HBASE-9158 the edit for row4 would have been marked with 283 // cluster 0's id 284 // and hence not replicated to cluster 0 285 wait(row4, htables[0], false); 286 } finally { 287 close(htables); 288 shutDownMiniClusters(); 289 } 290 } 291 292 /** 293 * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk 294 * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers. 295 */ 296 @Test 297 public void testHFileMultiSlaveReplication() throws Exception { 298 LOG.info("testHFileMultiSlaveReplication"); 299 int numClusters = 3; 300 Table[] htables = null; 301 try { 302 startMiniClusters(numClusters); 303 createTableOnClusters(table); 304 305 // Add a slave, 0 -> 1 306 addPeer("1", 0, 1); 307 308 htables = getHTablesOnClusters(tableName); 309 310 // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated 311 // to cluster '1'. 312 byte[][][] hfileRanges = 313 new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") }, 314 new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, }; 315 int numOfRows = 100; 316 317 int[] expectedCounts = 318 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 319 320 loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row, 321 famName, htables, hfileRanges, numOfRows, expectedCounts, true); 322 323 // Validate data is not replicated to cluster '2'. 324 assertEquals(0, utilities[2].countRows(htables[2])); 325 326 rollWALAndWait(utilities[0], htables[0].getName(), row); 327 328 // Add one more slave, 0 -> 2 329 addPeer("2", 0, 2); 330 331 // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated 332 // to cluster '1' and '2'. Previous data should be replicated to cluster '2'. 333 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") }, 334 new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, }; 335 numOfRows = 200; 336 337 int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0], 338 hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows }; 339 340 loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row, 341 famName, htables, hfileRanges, numOfRows, newExpectedCounts, true); 342 343 } finally { 344 close(htables); 345 shutDownMiniClusters(); 346 } 347 } 348 349 /** 350 * It tests the bulk loaded hfile replication scenario to only explicitly specified table column 351 * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set 352 * only one CF data to replicate. 353 */ 354 @Test 355 public void testHFileReplicationForConfiguredTableCfs() throws Exception { 356 LOG.info("testHFileReplicationForConfiguredTableCfs"); 357 int numClusters = 2; 358 Table[] htables = null; 359 try { 360 startMiniClusters(numClusters); 361 createTableOnClusters(table); 362 363 htables = getHTablesOnClusters(tableName); 364 // Test the replication scenarios only 'f' is configured for table data replication not 'f1' 365 addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName)); 366 367 // Load 100 rows for each hfile range in cluster '0' for table CF 'f' 368 byte[][][] hfileRanges = 369 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 370 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, }; 371 int numOfRows = 100; 372 int[] expectedCounts = 373 new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows }; 374 375 loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables, 376 hfileRanges, numOfRows, expectedCounts, true); 377 378 // Load 100 rows for each hfile range in cluster '0' for table CF 'f1' 379 hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") }, 380 new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, }; 381 numOfRows = 100; 382 383 int[] newExpectedCounts = 384 new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] }; 385 386 loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables, 387 hfileRanges, numOfRows, newExpectedCounts, false); 388 389 // Validate data replication for CF 'f1' 390 391 // Source cluster table should contain data for the families 392 wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]); 393 394 // Sleep for enough time so that the data is still not replicated for the CF which is not 395 // configured for replication 396 Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME); 397 // Peer cluster should have only configured CF data 398 wait(1, htables[1], expectedCounts[1]); 399 } finally { 400 close(htables); 401 shutDownMiniClusters(); 402 } 403 } 404 405 /** 406 * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1. 407 */ 408 @Test 409 public void testCyclicReplication3() throws Exception { 410 LOG.info("testCyclicReplication2"); 411 int numClusters = 3; 412 Table[] htables = null; 413 try { 414 startMiniClusters(numClusters); 415 createTableOnClusters(table); 416 417 // Test the replication scenario of 0 -> 1 -> 2 -> 1 418 addPeer("1", 0, 1); 419 addPeer("1", 1, 2); 420 addPeer("1", 2, 1); 421 422 htables = getHTablesOnClusters(tableName); 423 424 // put "row" and wait 'til it got around 425 putAndWait(row, famName, htables[0], htables[2]); 426 putAndWait(row1, famName, htables[1], htables[2]); 427 putAndWait(row2, famName, htables[2], htables[1]); 428 429 deleteAndWait(row, htables[0], htables[2]); 430 deleteAndWait(row1, htables[1], htables[2]); 431 deleteAndWait(row2, htables[2], htables[1]); 432 433 int[] expectedCounts = new int[] { 1, 3, 3 }; 434 validateCounts(htables, put, expectedCounts); 435 validateCounts(htables, delete, expectedCounts); 436 } finally { 437 close(htables); 438 shutDownMiniClusters(); 439 } 440 } 441 442 /** 443 * Tests that base replication peer configs are applied on peer creation and the configs are 444 * overriden if updated as part of updateReplicationPeerConfig() 445 */ 446 @Test 447 public void testBasePeerConfigsForReplicationPeer() throws Exception { 448 LOG.info("testBasePeerConfigsForPeerMutations"); 449 String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; 450 String firstCustomPeerConfigValue = "test"; 451 String firstCustomPeerConfigUpdatedValue = "test_updated"; 452 453 String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config"; 454 String secondCustomPeerConfigValue = "testSecond"; 455 String secondCustomPeerConfigUpdatedValue = "testSecondUpdated"; 456 try { 457 baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 458 firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue)); 459 startMiniClusters(2); 460 addPeer("1", 0, 1); 461 addPeer("2", 0, 1); 462 Admin admin = utilities[0].getAdmin(); 463 464 // Validates base configs 1 is present for both peer. 465 Assert.assertEquals(firstCustomPeerConfigValue, 466 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 467 Assert.assertEquals(firstCustomPeerConfigValue, 468 admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey)); 469 470 // override value of configuration 1 for peer "1". 471 ReplicationPeerConfig updatedReplicationConfigForPeer1 = 472 ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig("1")) 473 .putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build(); 474 475 // add configuration 2 for peer "2". 476 ReplicationPeerConfig updatedReplicationConfigForPeer2 = 477 ReplicationPeerConfig.newBuilder(admin.getReplicationPeerConfig("2")) 478 .putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build(); 479 480 admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1); 481 admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2); 482 483 // validates configuration is overridden by updateReplicationPeerConfig 484 Assert.assertEquals(firstCustomPeerConfigUpdatedValue, 485 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 486 Assert.assertEquals(secondCustomPeerConfigUpdatedValue, 487 admin.getReplicationPeerConfig("2").getConfiguration().get(secondCustomPeerConfigKey)); 488 489 // Add second config to base config and perform restart. 490 utilities[0].getConfiguration().set( 491 ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 492 firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue).concat(";") 493 .concat(secondCustomPeerConfigKey).concat("=").concat(secondCustomPeerConfigValue)); 494 495 utilities[0].shutdownMiniHBaseCluster(); 496 utilities[0].restartHBaseCluster(1); 497 admin = utilities[0].getAdmin(); 498 499 // Configurations should be updated after restart again 500 Assert.assertEquals(firstCustomPeerConfigValue, 501 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 502 Assert.assertEquals(firstCustomPeerConfigValue, 503 admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey)); 504 505 Assert.assertEquals(secondCustomPeerConfigValue, 506 admin.getReplicationPeerConfig("1").getConfiguration().get(secondCustomPeerConfigKey)); 507 Assert.assertEquals(secondCustomPeerConfigValue, 508 admin.getReplicationPeerConfig("2").getConfiguration().get(secondCustomPeerConfigKey)); 509 } finally { 510 shutDownMiniClusters(); 511 baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 512 } 513 } 514 515 @Test 516 public void testBasePeerConfigsRemovalForReplicationPeer() throws Exception { 517 LOG.info("testBasePeerConfigsForPeerMutations"); 518 String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; 519 String firstCustomPeerConfigValue = "test"; 520 521 try { 522 baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 523 firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue)); 524 startMiniClusters(2); 525 addPeer("1", 0, 1); 526 Admin admin = utilities[0].getAdmin(); 527 528 // Validates base configs 1 is present for both peer. 529 Assert.assertEquals(firstCustomPeerConfigValue, 530 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 531 532 utilities[0].getConfiguration() 533 .unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 534 utilities[0].getConfiguration().set( 535 ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 536 firstCustomPeerConfigKey.concat("=").concat("")); 537 538 utilities[0].shutdownMiniHBaseCluster(); 539 utilities[0].restartHBaseCluster(1); 540 admin = utilities[0].getAdmin(); 541 542 // Configurations should be removed after restart again 543 Assert.assertNull( 544 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 545 } finally { 546 shutDownMiniClusters(); 547 baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 548 } 549 } 550 551 @Test 552 public void testRemoveBasePeerConfigWithoutExistingConfigForReplicationPeer() throws Exception { 553 LOG.info("testBasePeerConfigsForPeerMutations"); 554 String firstCustomPeerConfigKey = "hbase.xxx.custom_config"; 555 556 try { 557 baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 558 firstCustomPeerConfigKey.concat("=").concat("")); 559 startMiniClusters(2); 560 addPeer("1", 0, 1); 561 Admin admin = utilities[0].getAdmin(); 562 563 Assert.assertNull("Config should not be there", 564 admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey)); 565 } finally { 566 shutDownMiniClusters(); 567 baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 568 } 569 } 570 571 @After 572 public void tearDown() throws IOException { 573 configurations = null; 574 utilities = null; 575 } 576 577 @SuppressWarnings("resource") 578 private void startMiniClusters(int numClusters) throws Exception { 579 utilities = new HBaseTestingUtil[numClusters]; 580 configurations = new Configuration[numClusters]; 581 for (int i = 0; i < numClusters; i++) { 582 Configuration conf = new Configuration(baseConfiguration); 583 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + ThreadLocalRandom.current().nextInt()); 584 HBaseTestingUtil utility = new HBaseTestingUtil(conf); 585 if (i == 0) { 586 utility.startMiniZKCluster(); 587 miniZK = utility.getZkCluster(); 588 } else { 589 utility.setZkCluster(miniZK); 590 } 591 utility.startMiniCluster(); 592 utilities[i] = utility; 593 configurations[i] = conf; 594 new ZKWatcher(conf, "cluster" + i, null, true); 595 } 596 } 597 598 private void shutDownMiniClusters() throws Exception { 599 int numClusters = utilities.length; 600 for (int i = numClusters - 1; i >= 0; i--) { 601 if (utilities[i] != null) { 602 utilities[i].shutdownMiniCluster(); 603 } 604 } 605 miniZK.shutdown(); 606 } 607 608 private void createTableOnClusters(TableDescriptor table) throws Exception { 609 for (HBaseTestingUtil utility : utilities) { 610 utility.getAdmin().createTable(table); 611 } 612 } 613 614 private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber) 615 throws Exception { 616 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 617 Admin admin = conn.getAdmin()) { 618 admin.addReplicationPeer(id, ReplicationPeerConfig.newBuilder() 619 .setClusterKey(utilities[slaveClusterNumber].getRpcConnnectionURI()).build()); 620 } 621 } 622 623 private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) 624 throws Exception { 625 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 626 Admin admin = conn.getAdmin()) { 627 admin.addReplicationPeer(id, 628 ReplicationPeerConfig.newBuilder() 629 .setClusterKey(utilities[slaveClusterNumber].getRpcConnnectionURI()) 630 .setReplicateAllUserTables(false) 631 .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)).build()); 632 } 633 } 634 635 private void disablePeer(String id, int masterClusterNumber) throws Exception { 636 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 637 Admin admin = conn.getAdmin()) { 638 admin.disableReplicationPeer(id); 639 } 640 } 641 642 private void enablePeer(String id, int masterClusterNumber) throws Exception { 643 try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); 644 Admin admin = conn.getAdmin()) { 645 admin.enableReplicationPeer(id); 646 } 647 } 648 649 private void close(Closeable... closeables) { 650 try { 651 if (closeables != null) { 652 for (Closeable closeable : closeables) { 653 closeable.close(); 654 } 655 } 656 } catch (Exception e) { 657 LOG.warn("Exception occurred while closing the object:", e); 658 } 659 } 660 661 @SuppressWarnings("resource") 662 private Table[] getHTablesOnClusters(TableName tableName) throws Exception { 663 int numClusters = utilities.length; 664 Table[] htables = new Table[numClusters]; 665 for (int i = 0; i < numClusters; i++) { 666 Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName); 667 htables[i] = htable; 668 } 669 return htables; 670 } 671 672 private void validateCounts(Table[] htables, byte[] type, int[] expectedCounts) 673 throws IOException { 674 for (int i = 0; i < htables.length; i++) { 675 assertEquals(Bytes.toString(type) + " were replicated back ", expectedCounts[i], 676 getCount(htables[i], type)); 677 } 678 } 679 680 private int getCount(Table t, byte[] type) throws IOException { 681 Get test = new Get(row); 682 test.setAttribute("count", new byte[] {}); 683 Result res = t.get(test); 684 return Bytes.toInt(res.getValue(count, type)); 685 } 686 687 private void deleteAndWait(byte[] row, Table source, Table target) throws Exception { 688 Delete del = new Delete(row); 689 source.delete(del); 690 wait(row, target, true); 691 } 692 693 private void putAndWait(byte[] row, byte[] fam, Table source, Table target) throws Exception { 694 Put put = new Put(row); 695 put.addColumn(fam, row, row); 696 source.put(put); 697 wait(row, target, false); 698 } 699 700 private void loadAndValidateHFileReplication(String testName, int masterNumber, 701 int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges, 702 int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception { 703 HBaseTestingUtil util = utilities[masterNumber]; 704 705 Path dir = util.getDataTestDirOnTestFS(testName); 706 FileSystem fs = util.getTestFileSystem(); 707 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 708 Path familyDir = new Path(dir, Bytes.toString(fam)); 709 710 int hfileIdx = 0; 711 for (byte[][] range : hfileRanges) { 712 byte[] from = range[0]; 713 byte[] to = range[1]; 714 HFileTestUtil.createHFile(util.getConfiguration(), fs, 715 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows); 716 } 717 718 Table source = tables[masterNumber]; 719 final TableName tableName = source.getName(); 720 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir); 721 722 if (toValidate) { 723 for (int slaveClusterNumber : slaveNumbers) { 724 wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]); 725 } 726 } 727 } 728 729 private void wait(int slaveNumber, Table target, int expectedCount) 730 throws IOException, InterruptedException { 731 int count = 0; 732 for (int i = 0; i < NB_RETRIES; i++) { 733 if (i == NB_RETRIES - 1) { 734 fail("Waited too much time for bulkloaded data replication. Current count=" + count 735 + ", expected count=" + expectedCount); 736 } 737 count = utilities[slaveNumber].countRows(target); 738 if (count != expectedCount) { 739 LOG.info("Waiting more time for bulkloaded data replication."); 740 Thread.sleep(SLEEP_TIME); 741 } else { 742 break; 743 } 744 } 745 } 746 747 private void wait(byte[] row, Table target, boolean isDeleted) throws Exception { 748 Get get = new Get(row); 749 for (int i = 0; i < NB_RETRIES; i++) { 750 if (i == NB_RETRIES - 1) { 751 fail("Waited too much time for replication. Row:" + Bytes.toString(row) 752 + ". IsDeleteReplication:" + isDeleted); 753 } 754 Result res = target.get(get); 755 boolean sleep = isDeleted ? res.size() > 0 : res.isEmpty(); 756 if (sleep) { 757 LOG.info("Waiting for more time for replication. Row:" + Bytes.toString(row) 758 + ". IsDeleteReplication:" + isDeleted); 759 Thread.sleep(SLEEP_TIME); 760 } else { 761 if (!isDeleted) { 762 assertArrayEquals(res.value(), row); 763 } 764 LOG.info("Obtained row:" + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); 765 break; 766 } 767 } 768 } 769 770 private void rollWALAndWait(final HBaseTestingUtil utility, final TableName table, 771 final byte[] row) throws IOException { 772 final Admin admin = utility.getAdmin(); 773 final SingleProcessHBaseCluster cluster = utility.getMiniHBaseCluster(); 774 775 // find the region that corresponds to the given row. 776 HRegion region = null; 777 for (HRegion candidate : cluster.getRegions(table)) { 778 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { 779 region = candidate; 780 break; 781 } 782 } 783 assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); 784 785 final CountDownLatch latch = new CountDownLatch(1); 786 787 // listen for successful log rolls 788 final WALActionsListener listener = new WALActionsListener() { 789 @Override 790 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { 791 latch.countDown(); 792 } 793 }; 794 region.getWAL().registerWALActionsListener(listener); 795 796 // request a roll 797 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(), 798 region.getRegionInfo().getRegionName())); 799 800 // wait 801 try { 802 latch.await(); 803 } catch (InterruptedException exception) { 804 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " 805 + "replication tests fail, it's probably because we should still be waiting."); 806 Thread.currentThread().interrupt(); 807 } 808 region.getWAL().unregisterWALActionsListener(listener); 809 } 810 811 /** 812 * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same 813 * timestamp there is otherwise no way to count them. 814 */ 815 public static class CoprocessorCounter implements RegionCoprocessor, RegionObserver { 816 private int nCount = 0; 817 private int nDelete = 0; 818 819 @Override 820 public Optional<RegionObserver> getRegionObserver() { 821 return Optional.of(this); 822 } 823 824 @Override 825 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 826 final Put put, final WALEdit edit, final Durability durability) throws IOException { 827 nCount++; 828 } 829 830 @Override 831 public void postDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 832 final Delete delete, final WALEdit edit, final Durability durability) throws IOException { 833 nDelete++; 834 } 835 836 @Override 837 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 838 final Get get, final List<Cell> result) throws IOException { 839 if (get.getAttribute("count") != null) { 840 result.clear(); 841 // order is important! 842 result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete))); 843 result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount))); 844 c.bypass(); 845 } 846 } 847 } 848 849}