001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.hasItems;
022import static org.hamcrest.Matchers.hasSize;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.util.Collections;
031import java.util.NavigableMap;
032import java.util.Set;
033import java.util.TreeMap;
034import java.util.stream.Collectors;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellBuilderType;
040import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
041import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
051import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
052import org.apache.hadoop.hbase.replication.ReplicationException;
053import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
054import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
055import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
056import org.apache.hadoop.hbase.replication.ReplicationPeers;
057import org.apache.hadoop.hbase.replication.ReplicationQueueId;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
060import org.apache.hadoop.hbase.replication.ReplicationUtils;
061import org.apache.hadoop.hbase.replication.SyncReplicationState;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.testclassification.ReplicationTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.wal.WAL;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
070import org.apache.hadoop.hbase.wal.WALFactory;
071import org.apache.hadoop.hbase.wal.WALKeyImpl;
072import org.hamcrest.Matchers;
073import org.junit.After;
074import org.junit.AfterClass;
075import org.junit.Before;
076import org.junit.BeforeClass;
077import org.junit.ClassRule;
078import org.junit.Rule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.junit.rules.TestName;
082
083import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
084
085@Category({ ReplicationTests.class, MediumTests.class })
086public class TestReplicationSourceManager {
087
088  @ClassRule
089  public static final HBaseClassTestRule CLASS_RULE =
090    HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
091
092  public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint {
093
094    private String clusterKey;
095
096    @Override
097    public boolean replicate(ReplicateContext replicateContext) {
098      // if you want to block the replication, for example, do not want the recovered source to be
099      // removed
100      if (clusterKey.endsWith("error")) {
101        throw new RuntimeException("Inject error");
102      }
103      return true;
104    }
105
106    @Override
107    public void init(Context context) throws IOException {
108      super.init(context);
109      this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey();
110    }
111
112  }
113
114  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
115
116  private static Configuration CONF;
117
118  private static FileSystem FS;
119
120  private static final byte[] F1 = Bytes.toBytes("f1");
121
122  private static final byte[] F2 = Bytes.toBytes("f2");
123
124  private static final TableName TABLE_NAME = TableName.valueOf("test");
125
126  private static RegionInfo RI;
127
128  private static NavigableMap<byte[], Integer> SCOPES;
129
130  @Rule
131  public final TestName name = new TestName();
132
133  private Path oldLogDir;
134
135  private Path logDir;
136
137  private Path remoteLogDir;
138
139  private Server server;
140
141  private Replication replication;
142
143  private ReplicationSourceManager manager;
144
145  @BeforeClass
146  public static void setUpBeforeClass() throws Exception {
147    UTIL.startMiniCluster(1);
148    FS = UTIL.getTestFileSystem();
149    CONF = new Configuration(UTIL.getConfiguration());
150    CONF.setLong("replication.sleep.before.failover", 0);
151
152    RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
153    SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR);
154    SCOPES.put(F1, 1);
155    SCOPES.put(F2, 0);
156  }
157
158  @AfterClass
159  public static void tearDownAfterClass() throws IOException {
160    UTIL.shutdownMiniCluster();
161  }
162
163  @Before
164  public void setUp() throws Exception {
165    Path rootDir = UTIL.getDataTestDirOnTestFS(name.getMethodName());
166    CommonFSUtils.setRootDir(CONF, rootDir);
167    server = mock(Server.class);
168    when(server.getConfiguration()).thenReturn(CONF);
169    when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher());
170    when(server.getConnection()).thenReturn(UTIL.getConnection());
171    ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1);
172    when(server.getServerName()).thenReturn(sn);
173    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
174    FS.mkdirs(oldLogDir);
175    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
176    FS.mkdirs(logDir);
177    remoteLogDir = new Path(rootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
178    FS.mkdirs(remoteLogDir);
179    TableName tableName = TableName.valueOf("replication_" + name.getMethodName());
180    UTIL.getAdmin()
181      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
182    CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
183
184    replication = new Replication();
185    replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir,
186      new WALFactory(CONF, server.getServerName(), null));
187    manager = replication.getReplicationManager();
188  }
189
190  @After
191  public void tearDown() {
192    replication.stopReplicationService();
193  }
194
195  /**
196   * Add a peer and wait for it to initialize
197   */
198  private void addPeerAndWait(String peerId, String clusterKey, boolean syncRep)
199    throws ReplicationException, IOException {
200    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
201      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
202      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
203    if (syncRep) {
204      builder.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList()))
205        .setRemoteWALDir(FS.makeQualified(remoteLogDir).toString());
206    }
207
208    manager.getReplicationPeers().getPeerStorage().addPeer(peerId, builder.build(), true,
209      syncRep ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE);
210    manager.addPeer(peerId);
211    UTIL.waitFor(20000, () -> {
212      ReplicationSourceInterface rs = manager.getSource(peerId);
213      return rs != null && rs.isSourceActive();
214    });
215  }
216
217  /**
218   * Remove a peer and wait for it to get cleaned up
219   */
220  private void removePeerAndWait(String peerId) throws Exception {
221    ReplicationPeers rp = manager.getReplicationPeers();
222    rp.getPeerStorage().removePeer(peerId);
223    manager.removePeer(peerId);
224    UTIL.waitFor(20000, () -> {
225      if (rp.getPeer(peerId) != null) {
226        return false;
227      }
228      if (manager.getSource(peerId) != null) {
229        return false;
230      }
231      return manager.getOldSources().stream().noneMatch(rs -> rs.getPeerId().equals(peerId));
232    });
233  }
234
235  private void createWALFile(Path file) throws Exception {
236    ProtobufLogWriter writer = new ProtobufLogWriter();
237    try {
238      writer.init(FS, file, CONF, false, FS.getDefaultBlockSize(file), null);
239      WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME,
240        EnvironmentEdgeManager.currentTime(), SCOPES);
241      WALEdit edit = new WALEdit();
242      WALEditInternalHelper.addExtendedCell(edit,
243        ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1)
244          .setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build());
245      WALEditInternalHelper.addExtendedCell(edit,
246        ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2)
247          .setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build());
248      writer.append(new WAL.Entry(key, edit));
249      writer.sync(false);
250    } finally {
251      writer.close();
252    }
253  }
254
255  @Test
256  public void testClaimQueue() throws Exception {
257    String peerId = "1";
258    addPeerAndWait(peerId, "error", false);
259    ServerName serverName = ServerName.valueOf("hostname0.example.org", 12345, 123);
260    String walName1 = serverName.toString() + ".1";
261    createWALFile(new Path(oldLogDir, walName1));
262    ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
263    ReplicationQueueStorage queueStorage = manager.getQueueStorage();
264    queueStorage.setOffset(queueId, "", new ReplicationGroupOffset(peerId, 0),
265      Collections.emptyMap());
266    manager.claimQueue(queueId);
267    assertThat(manager.getOldSources(), hasSize(1));
268  }
269
270  @Test
271  public void testSameWALPrefix() throws IOException {
272    String walName1 = "localhost,8080,12345-45678-Peer.34567";
273    String walName2 = "localhost,8080,12345.56789";
274    manager.postLogRoll(new Path(walName1));
275    manager.postLogRoll(new Path(walName2));
276
277    Set<String> latestWals =
278      manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
279    assertThat(latestWals,
280      Matchers.<Set<String>> both(hasSize(2)).and(hasItems(walName1, walName2)));
281  }
282
283  private MetricsReplicationSourceSource getGlobalSource() {
284    return CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
285      .getGlobalSource();
286  }
287
288  @Test
289  public void testRemovePeerMetricsCleanup() throws Exception {
290    MetricsReplicationSourceSource globalSource = getGlobalSource();
291    int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
292    String peerId = "DummyPeer";
293    addPeerAndWait(peerId, "hbase", false);
294    // there is no latestPaths so the size of log queue should not change
295    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
296
297    ReplicationSourceInterface source = manager.getSource(peerId);
298    // Sanity check
299    assertNotNull(source);
300    int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
301    // Enqueue log and check if metrics updated
302    Path serverLogDir = new Path(logDir, server.getServerName().toString());
303    source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1"));
304    assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
305    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
306      globalSource.getSizeOfLogQueue());
307
308    // Removing the peer should reset the global metrics
309    removePeerAndWait(peerId);
310    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
311
312    // Adding the same peer back again should reset the single source metrics
313    addPeerAndWait(peerId, "hbase", false);
314    source = manager.getSource(peerId);
315    assertNotNull(source);
316    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
317      globalSource.getSizeOfLogQueue());
318  }
319
320  @Test
321  public void testDisablePeerMetricsCleanup() throws Exception {
322    final String peerId = "DummyPeer";
323    try {
324      MetricsReplicationSourceSource globalSource = getGlobalSource();
325      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
326      addPeerAndWait(peerId, "hbase", false);
327      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
328      ReplicationSourceInterface source = manager.getSource(peerId);
329      // Sanity check
330      assertNotNull(source);
331      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
332      // Enqueue log and check if metrics updated
333      Path serverLogDir = new Path(logDir, server.getServerName().toString());
334      source.enqueueLog(new Path(serverLogDir, server.getServerName() + ".1"));
335      assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
336      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
337        globalSource.getSizeOfLogQueue());
338
339      // Refreshing the peer should decrement the global and single source metrics
340      manager.refreshSources(peerId);
341      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
342
343      source = manager.getSource(peerId);
344      assertNotNull(source);
345      assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
346      assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
347        globalSource.getSizeOfLogQueue());
348    } finally {
349      removePeerAndWait(peerId);
350    }
351  }
352
353  @Test
354  public void testRemoveRemoteWALs() throws Exception {
355    String peerId = "2";
356    addPeerAndWait(peerId, "hbase", true);
357    // make sure that we can deal with files which does not exist
358    String walNameNotExists =
359      "remoteWAL-12345-" + peerId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
360    Path wal = new Path(logDir, walNameNotExists);
361    manager.postLogRoll(wal);
362
363    Path remoteLogDirForPeer = new Path(remoteLogDir, peerId);
364    FS.mkdirs(remoteLogDirForPeer);
365    String walName = "remoteWAL-12345-" + peerId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
366    Path remoteWAL =
367      new Path(remoteLogDirForPeer, walName).makeQualified(FS.getUri(), FS.getWorkingDirectory());
368    FS.create(remoteWAL).close();
369    wal = new Path(logDir, walName);
370    manager.postLogRoll(wal);
371
372    ReplicationSourceInterface source = manager.getSource(peerId);
373    manager.cleanOldLogs(walName, true, source);
374    assertFalse(FS.exists(remoteWAL));
375  }
376}