001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.net.URLEncoder;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.NavigableMap;
035import java.util.NavigableSet;
036import java.util.Set;
037import java.util.SortedSet;
038import java.util.TreeMap;
039import java.util.TreeSet;
040import java.util.UUID;
041import java.util.concurrent.CountDownLatch;
042import java.util.stream.Collectors;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.ClusterId;
048import org.apache.hadoop.hbase.CoordinatedStateManager;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HColumnDescriptor;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionInfo;
055import org.apache.hadoop.hbase.HTableDescriptor;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.Server;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.Waiter;
061import org.apache.hadoop.hbase.client.ClusterConnection;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.RegionInfoBuilder;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
067import org.apache.hadoop.hbase.replication.ReplicationFactory;
068import org.apache.hadoop.hbase.replication.ReplicationPeer;
069import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
070import org.apache.hadoop.hbase.replication.ReplicationPeers;
071import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
072import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
073import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
074import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
075import org.apache.hadoop.hbase.testclassification.MediumTests;
076import org.apache.hadoop.hbase.testclassification.ReplicationTests;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.CommonFSUtils;
079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
080import org.apache.hadoop.hbase.util.JVMClusterUtil;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.wal.WAL;
083import org.apache.hadoop.hbase.wal.WALEdit;
084import org.apache.hadoop.hbase.wal.WALFactory;
085import org.apache.hadoop.hbase.wal.WALKeyImpl;
086import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
087import org.apache.hadoop.hbase.zookeeper.ZKUtil;
088import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
089import org.junit.After;
090import org.junit.AfterClass;
091import org.junit.Before;
092import org.junit.ClassRule;
093import org.junit.Rule;
094import org.junit.Test;
095import org.junit.experimental.categories.Category;
096import org.junit.rules.TestName;
097import org.slf4j.Logger;
098import org.slf4j.LoggerFactory;
099
100import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
101import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
102
103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
106
107/**
108 * An abstract class that tests ReplicationSourceManager. Classes that extend this class should set
109 * up the proper config for this class and initialize the proper cluster using HBaseTestingUtility.
110 */
111@Category({ ReplicationTests.class, MediumTests.class })
112public abstract class TestReplicationSourceManager {
113
114  @ClassRule
115  public static final HBaseClassTestRule CLASS_RULE =
116    HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
117
118  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationSourceManager.class);
119
120  protected static Configuration conf;
121
122  protected static HBaseTestingUtility utility;
123
124  protected static Replication replication;
125
126  protected static ReplicationSourceManager manager;
127
128  protected static ReplicationSourceManager managerOfCluster;
129
130  protected static ZKWatcher zkw;
131
132  protected static HTableDescriptor htd;
133
134  protected static HRegionInfo hri;
135
136  protected static final byte[] r1 = Bytes.toBytes("r1");
137
138  protected static final byte[] r2 = Bytes.toBytes("r2");
139
140  protected static final byte[] f1 = Bytes.toBytes("f1");
141
142  protected static final byte[] f2 = Bytes.toBytes("f2");
143
144  protected static final TableName test = TableName.valueOf("test");
145
146  protected static final String slaveId = "1";
147
148  protected static FileSystem fs;
149
150  protected static Path oldLogDir;
151
152  protected static Path logDir;
153
154  protected static CountDownLatch latch;
155
156  protected static List<String> files = new ArrayList<>();
157  protected static NavigableMap<byte[], Integer> scopes;
158
159  protected static void setupZkAndReplication() throws Exception {
160    // The implementing class should set up the conf
161    assertNotNull(conf);
162    zkw = new ZKWatcher(conf, "test", null);
163    ZKUtil.createWithParents(zkw, "/hbase/replication");
164    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
165    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
166      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
167        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
168    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
169    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
170      ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
171    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
172    ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
173
174    ZKClusterId.setClusterId(zkw, new ClusterId());
175    CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
176    fs = FileSystem.get(conf);
177    oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
178    logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME);
179    replication = new Replication();
180    replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
181      new WALFactory(conf, "test", null));
182    managerOfCluster = getManagerFromCluster();
183    if (managerOfCluster != null) {
184      // After replication procedure, we need to add peer by hand (other than by receiving
185      // notification from zk)
186      managerOfCluster.addPeer(slaveId);
187    }
188
189    manager = replication.getReplicationManager();
190    manager.addSource(slaveId);
191    if (managerOfCluster != null) {
192      waitPeer(slaveId, managerOfCluster, true);
193    }
194    waitPeer(slaveId, manager, true);
195
196    htd = new HTableDescriptor(test);
197    HColumnDescriptor col = new HColumnDescriptor(f1);
198    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
199    htd.addFamily(col);
200    col = new HColumnDescriptor(f2);
201    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
202    htd.addFamily(col);
203
204    scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
205    for (byte[] fam : htd.getFamiliesKeys()) {
206      scopes.put(fam, 0);
207    }
208    hri = new HRegionInfo(htd.getTableName(), r1, r2);
209  }
210
211  private static ReplicationSourceManager getManagerFromCluster() {
212    // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
213    if (utility.getMiniHBaseCluster() == null) {
214      return null;
215    }
216    return utility.getMiniHBaseCluster().getRegionServerThreads().stream()
217      .map(JVMClusterUtil.RegionServerThread::getRegionServer).findAny()
218      .map(HRegionServer::getReplicationSourceService).map(r -> (Replication) r)
219      .map(Replication::getReplicationManager).get();
220  }
221
222  @AfterClass
223  public static void tearDownAfterClass() throws Exception {
224    if (manager != null) {
225      manager.join();
226    }
227    utility.shutdownMiniCluster();
228  }
229
230  @Rule
231  public TestName testName = new TestName();
232
233  private void cleanLogDir() throws IOException {
234    fs.delete(logDir, true);
235    fs.delete(oldLogDir, true);
236  }
237
238  @Before
239  public void setUp() throws Exception {
240    LOG.info("Start " + testName.getMethodName());
241    cleanLogDir();
242  }
243
244  @After
245  public void tearDown() throws Exception {
246    LOG.info("End " + testName.getMethodName());
247    cleanLogDir();
248    List<String> ids = manager.getSources().stream().map(ReplicationSourceInterface::getPeerId)
249      .collect(Collectors.toList());
250    for (String id : ids) {
251      if (slaveId.equals(id)) {
252        continue;
253      }
254      removePeerAndWait(id);
255    }
256  }
257
258  @Test
259  public void testLogRoll() throws Exception {
260    long baseline = 1000;
261    long time = baseline;
262    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
263    KeyValue kv = new KeyValue(r1, f1, r1);
264    WALEdit edit = new WALEdit();
265    edit.add(kv);
266
267    WALFactory wals =
268      new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
269    ReplicationSourceManager replicationManager = replication.getReplicationManager();
270    wals.getWALProvider()
271      .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
272    final WAL wal = wals.getWAL(hri);
273    manager.init();
274    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
275    htd.addFamily(new HColumnDescriptor(f1));
276    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
277    for (byte[] fam : htd.getFamiliesKeys()) {
278      scopes.put(fam, 0);
279    }
280    // Testing normal log rolling every 20
281    for (long i = 1; i < 101; i++) {
282      if (i > 1 && i % 20 == 0) {
283        wal.rollWriter();
284      }
285      LOG.info(Long.toString(i));
286      final long txid = wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
287        EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
288      wal.sync(txid);
289    }
290
291    // Simulate a rapid insert that's followed
292    // by a report that's still not totally complete (missing last one)
293    LOG.info(baseline + " and " + time);
294    baseline += 101;
295    time = baseline;
296    LOG.info(baseline + " and " + time);
297
298    for (int i = 0; i < 3; i++) {
299      wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
300        EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
301    }
302    wal.sync();
303
304    int logNumber = 0;
305    for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
306      .entrySet()) {
307      logNumber += entry.getValue().size();
308    }
309    assertEquals(6, logNumber);
310
311    wal.rollWriter();
312
313    manager.logPositionAndCleanOldLogs(manager.getSources().get(0),
314      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
315
316    wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test,
317      EnvironmentEdgeManager.currentTime(), mvcc, scopes), edit);
318    wal.sync();
319
320    assertEquals(1, manager.getWALs().size());
321
322    // TODO Need a case with only 2 WALs and we only want to delete the first one
323  }
324
325  @Test
326  public void testClaimQueues() throws Exception {
327    Server server = new DummyServer("hostname0.example.org");
328    ReplicationQueueStorage rq = ReplicationStorageFactory
329      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
330    // populate some znodes in the peer znode
331    files.add("log1");
332    files.add("log2");
333    for (String file : files) {
334      rq.addWAL(server.getServerName(), "1", file);
335    }
336    // create 3 DummyServers
337    Server s1 = new DummyServer("dummyserver1.example.org");
338    Server s2 = new DummyServer("dummyserver2.example.org");
339    Server s3 = new DummyServer("dummyserver3.example.org");
340
341    // create 3 DummyNodeFailoverWorkers
342    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
343    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
344    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
345
346    latch = new CountDownLatch(3);
347    // start the threads
348    w1.start();
349    w2.start();
350    w3.start();
351    // make sure only one is successful
352    int populatedMap = 0;
353    // wait for result now... till all the workers are done.
354    latch.await();
355    populatedMap +=
356      w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + w3.isLogZnodesMapPopulated();
357    assertEquals(1, populatedMap);
358    server.abort("", null);
359  }
360
361  @Test
362  public void testCleanupFailoverQueues() throws Exception {
363    Server server = new DummyServer("hostname1.example.org");
364    ReplicationQueueStorage rq = ReplicationStorageFactory
365      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
366    // populate some znodes in the peer znode
367    SortedSet<String> files = new TreeSet<>();
368    String group = "testgroup";
369    String file1 = group + ".log1";
370    String file2 = group + ".log2";
371    files.add(file1);
372    files.add(file2);
373    for (String file : files) {
374      rq.addWAL(server.getServerName(), "1", file);
375    }
376    Server s1 = new DummyServer("dummyserver1.example.org");
377    ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getFileSystem(),
378      s1.getZooKeeper(), s1.getConfiguration());
379    rp1.init();
380    manager.claimQueue(server.getServerName(), "1");
381    assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
382    String id = "1-" + server.getServerName().getServerName();
383    assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
384    manager.cleanOldLogs(file2, false, id, true);
385    // log1 should be deleted
386    assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
387  }
388
389  @Test
390  public void testCleanupUnknownPeerZNode() throws Exception {
391    Server server = new DummyServer("hostname2.example.org");
392    ReplicationQueueStorage rq = ReplicationStorageFactory
393      .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
394    // populate some znodes in the peer znode
395    // add log to an unknown peer
396    String group = "testgroup";
397    rq.addWAL(server.getServerName(), "2", group + ".log1");
398    rq.addWAL(server.getServerName(), "2", group + ".log2");
399
400    manager.claimQueue(server.getServerName(), "2");
401
402    // The log of the unknown peer should be removed from zk
403    for (String peer : manager.getAllQueues()) {
404      assertTrue(peer.startsWith("1"));
405    }
406  }
407
408  /**
409   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
410   * compaction WALEdit.
411   */
412  @Test
413  public void testCompactionWALEdits() throws Exception {
414    TableName tableName = TableName.valueOf("testCompactionWALEdits");
415    WALProtos.CompactionDescriptor compactionDescriptor =
416      WALProtos.CompactionDescriptor.getDefaultInstance();
417    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
418      .setEndKey(HConstants.EMPTY_END_ROW).build();
419    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
420    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
421  }
422
423  @Test
424  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
425    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
426    // 1. Get the bulk load wal edit event
427    WALEdit logEdit = getBulkLoadWALEdit(scope);
428    // 2. Create wal key
429    WALKeyImpl logKey = new WALKeyImpl(scope);
430
431    // 3. Get the scopes for the key
432    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
433
434    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
435    assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
436      logKey.getReplicationScopes());
437  }
438
439  @Test
440  public void testBulkLoadWALEdits() throws Exception {
441    // 1. Get the bulk load wal edit event
442    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
443    WALEdit logEdit = getBulkLoadWALEdit(scope);
444    // 2. Create wal key
445    WALKeyImpl logKey = new WALKeyImpl(scope);
446    // 3. Enable bulk load hfile replication
447    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
448    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
449
450    // 4. Get the scopes for the key
451    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
452
453    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
454    // Assert family with replication scope global is present in the key scopes
455    assertTrue("This family scope is set to global, should be part of replication key scopes.",
456      scopes.containsKey(f1));
457    // Assert family with replication scope local is not present in the key scopes
458    assertFalse("This family scope is set to local, should not be part of replication key scopes",
459      scopes.containsKey(f2));
460  }
461
462  /**
463   * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the
464   * corresponding ReplicationSourceInterface correctly cleans up the corresponding replication
465   * queue and ReplicationPeer. See HBASE-16096.
466   */
467  @Test
468  public void testPeerRemovalCleanup() throws Exception {
469    String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
470    final String peerId = "FakePeer";
471    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
472      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase");
473    try {
474      DummyServer server = new DummyServer();
475      ReplicationQueueStorage rq = ReplicationStorageFactory
476        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
477      // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
478      // initialization to throw an exception.
479      conf.set("replication.replicationsource.implementation",
480        FailInitializeDummyReplicationSource.class.getName());
481      final ReplicationPeers rp = manager.getReplicationPeers();
482      // Set up the znode and ReplicationPeer for the fake peer
483      // Don't wait for replication source to initialize, we know it won't.
484      addPeerAndWait(peerId, peerConfig, false);
485
486      // Sanity check
487      assertNull(manager.getSource(peerId));
488
489      // Create a replication queue for the fake peer
490      rq.addWAL(server.getServerName(), peerId, "FakeFile");
491      // Unregister peer, this should remove the peer and clear all queues associated with it
492      // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
493      removePeerAndWait(peerId);
494      assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
495    } finally {
496      conf.set("replication.replicationsource.implementation", replicationSourceImplName);
497      removePeerAndWait(peerId);
498    }
499  }
500
501  private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
502    ReplicationSourceInterface source = manager.getSource(slaveId);
503    // Retrieve the global replication metrics source
504    Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
505    f.setAccessible(true);
506    return (MetricsReplicationSourceSource) f.get(source.getSourceMetrics());
507  }
508
509  private static long getSizeOfLatestPath() {
510    // If no mini cluster is running, there are extra replication manager influencing the metrics.
511    if (utility.getMiniHBaseCluster() == null) {
512      return 0;
513    }
514    return utility.getMiniHBaseCluster().getRegionServerThreads().stream()
515      .map(JVMClusterUtil.RegionServerThread::getRegionServer)
516      .map(HRegionServer::getReplicationSourceService).map(r -> (Replication) r)
517      .map(Replication::getReplicationManager)
518      .mapToLong(ReplicationSourceManager::getSizeOfLatestPath).sum();
519  }
520
521  @Test
522  public void testRemovePeerMetricsCleanup() throws Exception {
523    final String peerId = "DummyPeer";
524    final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
525      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase");
526    try {
527      MetricsReplicationSourceSource globalSource = getGlobalSource();
528      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
529      final long sizeOfLatestPath = getSizeOfLatestPath();
530      addPeerAndWait(peerId, peerConfig, true);
531      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
532      ReplicationSourceInterface source = manager.getSource(peerId);
533      // Sanity check
534      assertNotNull(source);
535      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
536      // Enqueue log and check if metrics updated
537      source.enqueueLog(new Path("abc"));
538      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
539      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
540        globalSource.getSizeOfLogQueue());
541
542      // Removing the peer should reset the global metrics
543      removePeerAndWait(peerId);
544      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
545
546      // Adding the same peer back again should reset the single source metrics
547      addPeerAndWait(peerId, peerConfig, true);
548      source = manager.getSource(peerId);
549      assertNotNull(source);
550      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
551        globalSource.getSizeOfLogQueue());
552    } finally {
553      removePeerAndWait(peerId);
554    }
555  }
556
557  @Test
558  public void testDisablePeerMetricsCleanup() throws Exception {
559    final String peerId = "DummyPeer";
560    final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
561      .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
562    try {
563      MetricsReplicationSourceSource globalSource = getGlobalSource();
564      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
565      final long sizeOfLatestPath = getSizeOfLatestPath();
566      addPeerAndWait(peerId, peerConfig, true);
567      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
568      ReplicationSourceInterface source = manager.getSource(peerId);
569      // Sanity check
570      assertNotNull(source);
571      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
572      // Enqueue log and check if metrics updated
573      source.enqueueLog(new Path("abc"));
574      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
575      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
576        globalSource.getSizeOfLogQueue());
577
578      // Refreshing the peer should decrement the global and single source metrics
579      manager.refreshSources(peerId);
580      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
581
582      source = manager.getSource(peerId);
583      assertNotNull(source);
584      assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
585      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
586        globalSource.getSizeOfLogQueue());
587    } finally {
588      removePeerAndWait(peerId);
589    }
590  }
591
592  /**
593   * Add a peer and wait for it to initialize
594   * @param waitForSource Whether to wait for replication source to initialize
595   */
596  private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
597    final boolean waitForSource) throws Exception {
598    final ReplicationPeers rp = manager.getReplicationPeers();
599    rp.getPeerStorage().addPeer(peerId, peerConfig, true);
600    try {
601      manager.addPeer(peerId);
602    } catch (Exception e) {
603      // ignore the failed exception, because we'll test both success & failed case.
604    }
605    waitPeer(peerId, manager, waitForSource);
606    if (managerOfCluster != null) {
607      managerOfCluster.addPeer(peerId);
608      waitPeer(peerId, managerOfCluster, waitForSource);
609    }
610  }
611
612  private static void waitPeer(final String peerId, ReplicationSourceManager manager,
613    final boolean waitForSource) {
614    ReplicationPeers rp = manager.getReplicationPeers();
615    Waiter.waitFor(conf, 20000, () -> {
616      if (waitForSource) {
617        ReplicationSourceInterface rs = manager.getSource(peerId);
618        if (rs == null) {
619          return false;
620        }
621        if (rs instanceof ReplicationSourceDummy) {
622          return ((ReplicationSourceDummy) rs).isStartup();
623        }
624        return true;
625      } else {
626        return (rp.getPeer(peerId) != null);
627      }
628    });
629  }
630
631  /**
632   * Remove a peer and wait for it to get cleaned up
633   */
634  private void removePeerAndWait(final String peerId) throws Exception {
635    final ReplicationPeers rp = manager.getReplicationPeers();
636    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
637      rp.getPeerStorage().removePeer(peerId);
638      try {
639        manager.removePeer(peerId);
640      } catch (Exception e) {
641        // ignore the failed exception and continue.
642      }
643    }
644    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
645      @Override
646      public boolean evaluate() throws Exception {
647        Collection<String> peers = rp.getPeerStorage().listPeerIds();
648        return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
649          && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
650      }
651    });
652  }
653
654  @Test
655  public void testSameWALPrefix() throws IOException {
656    Set<String> latestWalsBefore =
657      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
658    String walName1 = "localhost,8080,12345-45678-Peer.34567";
659    String walName2 = "localhost,8080,12345.56789";
660    manager.preLogRoll(new Path(walName1));
661    manager.preLogRoll(new Path(walName2));
662
663    Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName)
664      .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
665    assertEquals(2, latestWals.size());
666    assertTrue(latestWals.contains(walName1));
667    assertTrue(latestWals.contains(walName2));
668  }
669
670  private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
671    // 1. Create store files for the families
672    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
673    Map<String, Long> storeFilesSize = new HashMap<>(1);
674    List<Path> p = new ArrayList<>(1);
675    Path hfilePath1 = new Path(Bytes.toString(f1));
676    p.add(hfilePath1);
677    try {
678      storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen());
679    } catch (IOException e) {
680      LOG.debug("Failed to calculate the size of hfile " + hfilePath1);
681      storeFilesSize.put(hfilePath1.getName(), 0L);
682    }
683    storeFiles.put(f1, p);
684    scope.put(f1, 1);
685    p = new ArrayList<>(1);
686    Path hfilePath2 = new Path(Bytes.toString(f2));
687    p.add(hfilePath2);
688    try {
689      storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen());
690    } catch (IOException e) {
691      LOG.debug("Failed to calculate the size of hfile " + hfilePath2);
692      storeFilesSize.put(hfilePath2.getName(), 0L);
693    }
694    storeFiles.put(f2, p);
695    // 2. Create bulk load descriptor
696    BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
697      UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
698
699    // 3. create bulk load wal edit event
700    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
701    return logEdit;
702  }
703
704  static class DummyNodeFailoverWorker extends Thread {
705    private Map<String, Set<String>> logZnodesMap;
706    Server server;
707    private ServerName deadRS;
708    ReplicationQueueStorage rq;
709
710    public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
711      this.deadRS = deadRS;
712      this.server = s;
713      this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
714        server.getConfiguration());
715    }
716
717    @Override
718    public void run() {
719      try {
720        logZnodesMap = new HashMap<>();
721        List<String> queues = rq.getAllQueues(deadRS);
722        for (String queue : queues) {
723          Pair<String, SortedSet<String>> pair =
724            rq.claimQueue(deadRS, queue, server.getServerName());
725          if (pair != null) {
726            logZnodesMap.put(pair.getFirst(), pair.getSecond());
727          }
728        }
729        server.abort("Done with testing", null);
730      } catch (Exception e) {
731        LOG.error("Got exception while running NodeFailoverWorker", e);
732      } finally {
733        latch.countDown();
734      }
735    }
736
737    /** Returns 1 when the map is not empty. */
738    private int isLogZnodesMapPopulated() {
739      Collection<Set<String>> sets = logZnodesMap.values();
740      if (sets.size() > 1) {
741        throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
742      }
743      if (sets.size() == 1) {
744        Set<String> s = sets.iterator().next();
745        for (String file : files) {
746          // at least one file was missing
747          if (!s.contains(file)) {
748            return 0;
749          }
750        }
751        return 1; // we found all the files
752      }
753      return 0;
754    }
755  }
756
757  static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
758
759    @Override
760    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
761      ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
762      UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
763      throws IOException {
764      throw new IOException("Failing deliberately");
765    }
766  }
767
768  static class DummyServer implements Server {
769    String hostname;
770
771    DummyServer() {
772      hostname = "hostname.example.org";
773    }
774
775    DummyServer(String hostname) {
776      this.hostname = hostname;
777    }
778
779    @Override
780    public Configuration getConfiguration() {
781      return conf;
782    }
783
784    @Override
785    public ZKWatcher getZooKeeper() {
786      return zkw;
787    }
788
789    @Override
790    public CoordinatedStateManager getCoordinatedStateManager() {
791      return null;
792    }
793
794    @Override
795    public ClusterConnection getConnection() {
796      return null;
797    }
798
799    @Override
800    public ServerName getServerName() {
801      return ServerName.valueOf(hostname, 1234, 1L);
802    }
803
804    @Override
805    public void abort(String why, Throwable e) {
806      // To change body of implemented methods use File | Settings | File Templates.
807    }
808
809    @Override
810    public boolean isAborted() {
811      return false;
812    }
813
814    @Override
815    public void stop(String why) {
816      // To change body of implemented methods use File | Settings | File Templates.
817    }
818
819    @Override
820    public boolean isStopped() {
821      return false; // To change body of implemented methods use File | Settings | File Templates.
822    }
823
824    @Override
825    public ChoreService getChoreService() {
826      return null;
827    }
828
829    @Override
830    public ClusterConnection getClusterConnection() {
831      return null;
832    }
833
834    @Override
835    public FileSystem getFileSystem() {
836      return fs;
837    }
838
839    @Override
840    public boolean isStopping() {
841      return false;
842    }
843
844    @Override
845    public Connection createConnection(Configuration conf) throws IOException {
846      return null;
847    }
848  }
849}