001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.lang.reflect.Field; 028import java.net.URLEncoder; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableMap; 035import java.util.NavigableSet; 036import java.util.Set; 037import java.util.SortedSet; 038import java.util.TreeMap; 039import java.util.TreeSet; 040import java.util.UUID; 041import java.util.concurrent.CountDownLatch; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.ClusterId; 048import org.apache.hadoop.hbase.CoordinatedStateManager; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HColumnDescriptor; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.HRegionInfo; 055import org.apache.hadoop.hbase.HTableDescriptor; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.Server; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.Waiter; 061import org.apache.hadoop.hbase.client.ClusterConnection; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.RegionInfo; 064import org.apache.hadoop.hbase.client.RegionInfoBuilder; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 067import org.apache.hadoop.hbase.replication.ReplicationFactory; 068import org.apache.hadoop.hbase.replication.ReplicationPeer; 069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 070import org.apache.hadoop.hbase.replication.ReplicationPeers; 071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; 073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 074import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; 075import org.apache.hadoop.hbase.testclassification.MediumTests; 076import org.apache.hadoop.hbase.testclassification.ReplicationTests; 077import org.apache.hadoop.hbase.util.Bytes; 078import org.apache.hadoop.hbase.util.CommonFSUtils; 079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 080import org.apache.hadoop.hbase.util.JVMClusterUtil; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.wal.WAL; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALFactory; 085import org.apache.hadoop.hbase.wal.WALKeyImpl; 086import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 087import org.apache.hadoop.hbase.zookeeper.ZKUtil; 088import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 089import org.junit.After; 090import org.junit.AfterClass; 091import org.junit.Before; 092import org.junit.ClassRule; 093import org.junit.Rule; 094import org.junit.Test; 095import org.junit.experimental.categories.Category; 096import org.junit.rules.TestName; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 101import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 102 103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 106 107/** 108 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should set 109 * up the proper config for this class and initialize the proper cluster using HBaseTestingUtility. 110 */ 111@Category({ ReplicationTests.class, MediumTests.class }) 112public abstract class TestReplicationSourceManager { 113 114 @ClassRule 115 public static final HBaseClassTestRule CLASS_RULE = 116 HBaseClassTestRule.forClass(TestReplicationSourceManager.class); 117 118 protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationSourceManager.class); 119 120 protected static Configuration conf; 121 122 protected static HBaseTestingUtility utility; 123 124 protected static Replication replication; 125 126 protected static ReplicationSourceManager manager; 127 128 protected static ReplicationSourceManager managerOfCluster; 129 130 protected static ZKWatcher zkw; 131 132 protected static HTableDescriptor htd; 133 134 protected static HRegionInfo hri; 135 136 protected static final byte[] r1 = Bytes.toBytes("r1"); 137 138 protected static final byte[] r2 = Bytes.toBytes("r2"); 139 140 protected static final byte[] f1 = Bytes.toBytes("f1"); 141 142 protected static final byte[] f2 = Bytes.toBytes("f2"); 143 144 protected static final TableName test = TableName.valueOf("test"); 145 146 protected static final String slaveId = "1"; 147 148 protected static FileSystem fs; 149 150 protected static Path oldLogDir; 151 152 protected static Path logDir; 153 154 protected static CountDownLatch latch; 155 156 protected static List<String> files = new ArrayList<>(); 157 protected static NavigableMap<byte[], Integer> scopes; 158 159 protected static void setupZkAndReplication() throws Exception { 160 // The implementing class should set up the conf 161 assertNotNull(conf); 162 zkw = new ZKWatcher(conf, "test", null); 163 ZKUtil.createWithParents(zkw, "/hbase/replication"); 164 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); 165 ZKUtil.setData(zkw, "/hbase/replication/peers/1", 166 Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 167 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); 168 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); 169 ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", 170 ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 171 ZKUtil.createWithParents(zkw, "/hbase/replication/state"); 172 ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); 173 174 ZKClusterId.setClusterId(zkw, new ClusterId()); 175 CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); 176 fs = FileSystem.get(conf); 177 oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 178 logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME); 179 replication = new Replication(); 180 replication.initialize(new DummyServer(), fs, logDir, oldLogDir, 181 new WALFactory(conf, "test", null)); 182 managerOfCluster = getManagerFromCluster(); 183 if (managerOfCluster != null) { 184 // After replication procedure, we need to add peer by hand (other than by receiving 185 // notification from zk) 186 managerOfCluster.addPeer(slaveId); 187 } 188 189 manager = replication.getReplicationManager(); 190 manager.addSource(slaveId); 191 if (managerOfCluster != null) { 192 waitPeer(slaveId, managerOfCluster, true); 193 } 194 waitPeer(slaveId, manager, true); 195 196 htd = new HTableDescriptor(test); 197 HColumnDescriptor col = new HColumnDescriptor(f1); 198 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); 199 htd.addFamily(col); 200 col = new HColumnDescriptor(f2); 201 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); 202 htd.addFamily(col); 203 204 scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 205 for (byte[] fam : htd.getFamiliesKeys()) { 206 scopes.put(fam, 0); 207 } 208 hri = new HRegionInfo(htd.getTableName(), r1, r2); 209 } 210 211 private static ReplicationSourceManager getManagerFromCluster() { 212 // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster. 213 if (utility.getMiniHBaseCluster() == null) { 214 return null; 215 } 216 return utility.getMiniHBaseCluster().getRegionServerThreads().stream() 217 .map(JVMClusterUtil.RegionServerThread::getRegionServer).findAny() 218 .map(HRegionServer::getReplicationSourceService).map(r -> (Replication) r) 219 .map(Replication::getReplicationManager).get(); 220 } 221 222 @AfterClass 223 public static void tearDownAfterClass() throws Exception { 224 if (manager != null) { 225 manager.join(); 226 } 227 utility.shutdownMiniCluster(); 228 } 229 230 @Rule 231 public TestName testName = new TestName(); 232 233 private void cleanLogDir() throws IOException { 234 fs.delete(logDir, true); 235 fs.delete(oldLogDir, true); 236 } 237 238 @Before 239 public void setUp() throws Exception { 240 LOG.info("Start " + testName.getMethodName()); 241 cleanLogDir(); 242 } 243 244 @After 245 public void tearDown() throws Exception { 246 LOG.info("End " + testName.getMethodName()); 247 cleanLogDir(); 248 List<String> ids = manager.getSources().stream().map(ReplicationSourceInterface::getPeerId) 249 .collect(Collectors.toList()); 250 for (String id : ids) { 251 if (slaveId.equals(id)) { 252 continue; 253 } 254 removePeerAndWait(id); 255 } 256 } 257 258 @Test 259 public void testLogRoll() throws Exception { 260 long baseline = 1000; 261 long time = baseline; 262 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 263 KeyValue kv = new KeyValue(r1, f1, r1); 264 WALEdit edit = new WALEdit(); 265 edit.add(kv); 266 267 WALFactory wals = 268 new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); 269 ReplicationSourceManager replicationManager = replication.getReplicationManager(); 270 wals.getWALProvider() 271 .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); 272 final WAL wal = wals.getWAL(hri); 273 manager.init(); 274 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); 275 htd.addFamily(new HColumnDescriptor(f1)); 276 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 277 for (byte[] fam : htd.getFamiliesKeys()) { 278 scopes.put(fam, 0); 279 } 280 // Testing normal log rolling every 20 281 for (long i = 1; i < 101; i++) { 282 if (i > 1 && i % 20 == 0) { 283 wal.rollWriter(); 284 } 285 LOG.info(Long.toString(i)); 286 final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 287 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 288 wal.sync(txid); 289 } 290 291 // Simulate a rapid insert that's followed 292 // by a report that's still not totally complete (missing last one) 293 LOG.info(baseline + " and " + time); 294 baseline += 101; 295 time = baseline; 296 LOG.info(baseline + " and " + time); 297 298 for (int i = 0; i < 3; i++) { 299 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 300 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 301 } 302 wal.sync(); 303 304 int logNumber = 0; 305 for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId) 306 .entrySet()) { 307 logNumber += entry.getValue().size(); 308 } 309 assertEquals(6, logNumber); 310 311 wal.rollWriter(); 312 313 manager.logPositionAndCleanOldLogs(manager.getSources().get(0), 314 new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); 315 316 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, 317 EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit); 318 wal.sync(); 319 320 assertEquals(1, manager.getWALs().size()); 321 322 // TODO Need a case with only 2 WALs and we only want to delete the first one 323 } 324 325 @Test 326 public void testClaimQueues() throws Exception { 327 Server server = new DummyServer("hostname0.example.org"); 328 ReplicationQueueStorage rq = ReplicationStorageFactory 329 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 330 // populate some znodes in the peer znode 331 files.add("log1"); 332 files.add("log2"); 333 for (String file : files) { 334 rq.addWAL(server.getServerName(), "1", file); 335 } 336 // create 3 DummyServers 337 Server s1 = new DummyServer("dummyserver1.example.org"); 338 Server s2 = new DummyServer("dummyserver2.example.org"); 339 Server s3 = new DummyServer("dummyserver3.example.org"); 340 341 // create 3 DummyNodeFailoverWorkers 342 DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1); 343 DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2); 344 DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3); 345 346 latch = new CountDownLatch(3); 347 // start the threads 348 w1.start(); 349 w2.start(); 350 w3.start(); 351 // make sure only one is successful 352 int populatedMap = 0; 353 // wait for result now... till all the workers are done. 354 latch.await(); 355 populatedMap += 356 w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated(); 357 assertEquals(1, populatedMap); 358 server.abort("", null); 359 } 360 361 @Test 362 public void testCleanupFailoverQueues() throws Exception { 363 Server server = new DummyServer("hostname1.example.org"); 364 ReplicationQueueStorage rq = ReplicationStorageFactory 365 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 366 // populate some znodes in the peer znode 367 SortedSet<String> files = new TreeSet<>(); 368 String group = "testgroup"; 369 String file1 = group + ".log1"; 370 String file2 = group + ".log2"; 371 files.add(file1); 372 files.add(file2); 373 for (String file : files) { 374 rq.addWAL(server.getServerName(), "1", file); 375 } 376 Server s1 = new DummyServer("dummyserver1.example.org"); 377 ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getFileSystem(), 378 s1.getZooKeeper(), s1.getConfiguration()); 379 rp1.init(); 380 manager.claimQueue(server.getServerName(), "1"); 381 assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); 382 String id = "1-" + server.getServerName().getServerName(); 383 assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); 384 manager.cleanOldLogs(file2, false, id, true); 385 // log1 should be deleted 386 assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); 387 } 388 389 @Test 390 public void testCleanupUnknownPeerZNode() throws Exception { 391 Server server = new DummyServer("hostname2.example.org"); 392 ReplicationQueueStorage rq = ReplicationStorageFactory 393 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 394 // populate some znodes in the peer znode 395 // add log to an unknown peer 396 String group = "testgroup"; 397 rq.addWAL(server.getServerName(), "2", group + ".log1"); 398 rq.addWAL(server.getServerName(), "2", group + ".log2"); 399 400 manager.claimQueue(server.getServerName(), "2"); 401 402 // The log of the unknown peer should be removed from zk 403 for (String peer : manager.getAllQueues()) { 404 assertTrue(peer.startsWith("1")); 405 } 406 } 407 408 /** 409 * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the 410 * compaction WALEdit. 411 */ 412 @Test 413 public void testCompactionWALEdits() throws Exception { 414 TableName tableName = TableName.valueOf("testCompactionWALEdits"); 415 WALProtos.CompactionDescriptor compactionDescriptor = 416 WALProtos.CompactionDescriptor.getDefaultInstance(); 417 RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) 418 .setEndKey(HConstants.EMPTY_END_ROW).build(); 419 WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); 420 ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); 421 } 422 423 @Test 424 public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { 425 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 426 // 1. Get the bulk load wal edit event 427 WALEdit logEdit = getBulkLoadWALEdit(scope); 428 // 2. Create wal key 429 WALKeyImpl logKey = new WALKeyImpl(scope); 430 431 // 3. Get the scopes for the key 432 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); 433 434 // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled 435 assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", 436 logKey.getReplicationScopes()); 437 } 438 439 @Test 440 public void testBulkLoadWALEdits() throws Exception { 441 // 1. Get the bulk load wal edit event 442 NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 443 WALEdit logEdit = getBulkLoadWALEdit(scope); 444 // 2. Create wal key 445 WALKeyImpl logKey = new WALKeyImpl(scope); 446 // 3. Enable bulk load hfile replication 447 Configuration bulkLoadConf = HBaseConfiguration.create(conf); 448 bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 449 450 // 4. Get the scopes for the key 451 ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); 452 453 NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); 454 // Assert family with replication scope global is present in the key scopes 455 assertTrue("This family scope is set to global, should be part of replication key scopes.", 456 scopes.containsKey(f1)); 457 // Assert family with replication scope local is not present in the key scopes 458 assertFalse("This family scope is set to local, should not be part of replication key scopes", 459 scopes.containsKey(f2)); 460 } 461 462 /** 463 * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the 464 * corresponding ReplicationSourceInterface correctly cleans up the corresponding replication 465 * queue and ReplicationPeer. See HBASE-16096. 466 */ 467 @Test 468 public void testPeerRemovalCleanup() throws Exception { 469 String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); 470 final String peerId = "FakePeer"; 471 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 472 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase"); 473 try { 474 DummyServer server = new DummyServer(); 475 ReplicationQueueStorage rq = ReplicationStorageFactory 476 .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); 477 // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface 478 // initialization to throw an exception. 479 conf.set("replication.replicationsource.implementation", 480 FailInitializeDummyReplicationSource.class.getName()); 481 final ReplicationPeers rp = manager.getReplicationPeers(); 482 // Set up the znode and ReplicationPeer for the fake peer 483 // Don't wait for replication source to initialize, we know it won't. 484 addPeerAndWait(peerId, peerConfig, false); 485 486 // Sanity check 487 assertNull(manager.getSource(peerId)); 488 489 // Create a replication queue for the fake peer 490 rq.addWAL(server.getServerName(), peerId, "FakeFile"); 491 // Unregister peer, this should remove the peer and clear all queues associated with it 492 // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. 493 removePeerAndWait(peerId); 494 assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId)); 495 } finally { 496 conf.set("replication.replicationsource.implementation", replicationSourceImplName); 497 removePeerAndWait(peerId); 498 } 499 } 500 501 private static MetricsReplicationSourceSource getGlobalSource() throws Exception { 502 ReplicationSourceInterface source = manager.getSource(slaveId); 503 // Retrieve the global replication metrics source 504 Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); 505 f.setAccessible(true); 506 return (MetricsReplicationSourceSource) f.get(source.getSourceMetrics()); 507 } 508 509 private static long getSizeOfLatestPath() { 510 // If no mini cluster is running, there are extra replication manager influencing the metrics. 511 if (utility.getMiniHBaseCluster() == null) { 512 return 0; 513 } 514 return utility.getMiniHBaseCluster().getRegionServerThreads().stream() 515 .map(JVMClusterUtil.RegionServerThread::getRegionServer) 516 .map(HRegionServer::getReplicationSourceService).map(r -> (Replication) r) 517 .map(Replication::getReplicationManager) 518 .mapToLong(ReplicationSourceManager::getSizeOfLatestPath).sum(); 519 } 520 521 @Test 522 public void testRemovePeerMetricsCleanup() throws Exception { 523 final String peerId = "DummyPeer"; 524 final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() 525 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase"); 526 try { 527 MetricsReplicationSourceSource globalSource = getGlobalSource(); 528 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 529 final long sizeOfLatestPath = getSizeOfLatestPath(); 530 addPeerAndWait(peerId, peerConfig, true); 531 assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 532 ReplicationSourceInterface source = manager.getSource(peerId); 533 // Sanity check 534 assertNotNull(source); 535 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 536 // Enqueue log and check if metrics updated 537 source.enqueueLog(new Path("abc")); 538 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 539 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 540 globalSource.getSizeOfLogQueue()); 541 542 // Removing the peer should reset the global metrics 543 removePeerAndWait(peerId); 544 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 545 546 // Adding the same peer back again should reset the single source metrics 547 addPeerAndWait(peerId, peerConfig, true); 548 source = manager.getSource(peerId); 549 assertNotNull(source); 550 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 551 globalSource.getSizeOfLogQueue()); 552 } finally { 553 removePeerAndWait(peerId); 554 } 555 } 556 557 @Test 558 public void testDisablePeerMetricsCleanup() throws Exception { 559 final String peerId = "DummyPeer"; 560 final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 561 .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build(); 562 try { 563 MetricsReplicationSourceSource globalSource = getGlobalSource(); 564 final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); 565 final long sizeOfLatestPath = getSizeOfLatestPath(); 566 addPeerAndWait(peerId, peerConfig, true); 567 assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 568 ReplicationSourceInterface source = manager.getSource(peerId); 569 // Sanity check 570 assertNotNull(source); 571 final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); 572 // Enqueue log and check if metrics updated 573 source.enqueueLog(new Path("abc")); 574 assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 575 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 576 globalSource.getSizeOfLogQueue()); 577 578 // Refreshing the peer should decrement the global and single source metrics 579 manager.refreshSources(peerId); 580 assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); 581 582 source = manager.getSource(peerId); 583 assertNotNull(source); 584 assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); 585 assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, 586 globalSource.getSizeOfLogQueue()); 587 } finally { 588 removePeerAndWait(peerId); 589 } 590 } 591 592 /** 593 * Add a peer and wait for it to initialize 594 * @param waitForSource Whether to wait for replication source to initialize 595 */ 596 private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, 597 final boolean waitForSource) throws Exception { 598 final ReplicationPeers rp = manager.getReplicationPeers(); 599 rp.getPeerStorage().addPeer(peerId, peerConfig, true); 600 try { 601 manager.addPeer(peerId); 602 } catch (Exception e) { 603 // ignore the failed exception, because we'll test both success & failed case. 604 } 605 waitPeer(peerId, manager, waitForSource); 606 if (managerOfCluster != null) { 607 managerOfCluster.addPeer(peerId); 608 waitPeer(peerId, managerOfCluster, waitForSource); 609 } 610 } 611 612 private static void waitPeer(final String peerId, ReplicationSourceManager manager, 613 final boolean waitForSource) { 614 ReplicationPeers rp = manager.getReplicationPeers(); 615 Waiter.waitFor(conf, 20000, () -> { 616 if (waitForSource) { 617 ReplicationSourceInterface rs = manager.getSource(peerId); 618 if (rs == null) { 619 return false; 620 } 621 if (rs instanceof ReplicationSourceDummy) { 622 return ((ReplicationSourceDummy) rs).isStartup(); 623 } 624 return true; 625 } else { 626 return (rp.getPeer(peerId) != null); 627 } 628 }); 629 } 630 631 /** 632 * Remove a peer and wait for it to get cleaned up 633 */ 634 private void removePeerAndWait(final String peerId) throws Exception { 635 final ReplicationPeers rp = manager.getReplicationPeers(); 636 if (rp.getPeerStorage().listPeerIds().contains(peerId)) { 637 rp.getPeerStorage().removePeer(peerId); 638 try { 639 manager.removePeer(peerId); 640 } catch (Exception e) { 641 // ignore the failed exception and continue. 642 } 643 } 644 Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { 645 @Override 646 public boolean evaluate() throws Exception { 647 Collection<String> peers = rp.getPeerStorage().listPeerIds(); 648 return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) 649 && (!peers.contains(peerId)) && manager.getSource(peerId) == null; 650 } 651 }); 652 } 653 654 @Test 655 public void testSameWALPrefix() throws IOException { 656 Set<String> latestWalsBefore = 657 manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); 658 String walName1 = "localhost,8080,12345-45678-Peer.34567"; 659 String walName2 = "localhost,8080,12345.56789"; 660 manager.preLogRoll(new Path(walName1)); 661 manager.preLogRoll(new Path(walName2)); 662 663 Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName) 664 .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet()); 665 assertEquals(2, latestWals.size()); 666 assertTrue(latestWals.contains(walName1)); 667 assertTrue(latestWals.contains(walName2)); 668 } 669 670 private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { 671 // 1. Create store files for the families 672 Map<byte[], List<Path>> storeFiles = new HashMap<>(1); 673 Map<String, Long> storeFilesSize = new HashMap<>(1); 674 List<Path> p = new ArrayList<>(1); 675 Path hfilePath1 = new Path(Bytes.toString(f1)); 676 p.add(hfilePath1); 677 try { 678 storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen()); 679 } catch (IOException e) { 680 LOG.debug("Failed to calculate the size of hfile " + hfilePath1); 681 storeFilesSize.put(hfilePath1.getName(), 0L); 682 } 683 storeFiles.put(f1, p); 684 scope.put(f1, 1); 685 p = new ArrayList<>(1); 686 Path hfilePath2 = new Path(Bytes.toString(f2)); 687 p.add(hfilePath2); 688 try { 689 storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen()); 690 } catch (IOException e) { 691 LOG.debug("Failed to calculate the size of hfile " + hfilePath2); 692 storeFilesSize.put(hfilePath2.getName(), 0L); 693 } 694 storeFiles.put(f2, p); 695 // 2. Create bulk load descriptor 696 BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), 697 UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); 698 699 // 3. create bulk load wal edit event 700 WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc); 701 return logEdit; 702 } 703 704 static class DummyNodeFailoverWorker extends Thread { 705 private Map<String, Set<String>> logZnodesMap; 706 Server server; 707 private ServerName deadRS; 708 ReplicationQueueStorage rq; 709 710 public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception { 711 this.deadRS = deadRS; 712 this.server = s; 713 this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), 714 server.getConfiguration()); 715 } 716 717 @Override 718 public void run() { 719 try { 720 logZnodesMap = new HashMap<>(); 721 List<String> queues = rq.getAllQueues(deadRS); 722 for (String queue : queues) { 723 Pair<String, SortedSet<String>> pair = 724 rq.claimQueue(deadRS, queue, server.getServerName()); 725 if (pair != null) { 726 logZnodesMap.put(pair.getFirst(), pair.getSecond()); 727 } 728 } 729 server.abort("Done with testing", null); 730 } catch (Exception e) { 731 LOG.error("Got exception while running NodeFailoverWorker", e); 732 } finally { 733 latch.countDown(); 734 } 735 } 736 737 /** Returns 1 when the map is not empty. */ 738 private int isLogZnodesMapPopulated() { 739 Collection<Set<String>> sets = logZnodesMap.values(); 740 if (sets.size() > 1) { 741 throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); 742 } 743 if (sets.size() == 1) { 744 Set<String> s = sets.iterator().next(); 745 for (String file : files) { 746 // at least one file was missing 747 if (!s.contains(file)) { 748 return 0; 749 } 750 } 751 return 1; // we found all the files 752 } 753 return 0; 754 } 755 } 756 757 static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { 758 759 @Override 760 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 761 ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, 762 UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) 763 throws IOException { 764 throw new IOException("Failing deliberately"); 765 } 766 } 767 768 static class DummyServer implements Server { 769 String hostname; 770 771 DummyServer() { 772 hostname = "hostname.example.org"; 773 } 774 775 DummyServer(String hostname) { 776 this.hostname = hostname; 777 } 778 779 @Override 780 public Configuration getConfiguration() { 781 return conf; 782 } 783 784 @Override 785 public ZKWatcher getZooKeeper() { 786 return zkw; 787 } 788 789 @Override 790 public CoordinatedStateManager getCoordinatedStateManager() { 791 return null; 792 } 793 794 @Override 795 public ClusterConnection getConnection() { 796 return null; 797 } 798 799 @Override 800 public ServerName getServerName() { 801 return ServerName.valueOf(hostname, 1234, 1L); 802 } 803 804 @Override 805 public void abort(String why, Throwable e) { 806 // To change body of implemented methods use File | Settings | File Templates. 807 } 808 809 @Override 810 public boolean isAborted() { 811 return false; 812 } 813 814 @Override 815 public void stop(String why) { 816 // To change body of implemented methods use File | Settings | File Templates. 817 } 818 819 @Override 820 public boolean isStopped() { 821 return false; // To change body of implemented methods use File | Settings | File Templates. 822 } 823 824 @Override 825 public ChoreService getChoreService() { 826 return null; 827 } 828 829 @Override 830 public ClusterConnection getClusterConnection() { 831 return null; 832 } 833 834 @Override 835 public FileSystem getFileSystem() { 836 return fs; 837 } 838 839 @Override 840 public boolean isStopping() { 841 return false; 842 } 843 844 @Override 845 public Connection createConnection(Configuration conf) throws IOException { 846 return null; 847 } 848 } 849}