001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutionException;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.client.AsyncConnection;
033import org.apache.hadoop.hbase.client.AsyncTable;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.regionserver.HRegion;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.ReplicationTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
042import org.apache.hadoop.hbase.wal.WAL.Entry;
043import org.apache.hadoop.hbase.wal.WALStreamReader;
044import org.junit.Assert;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047
048@Category({ ReplicationTests.class, LargeTests.class })
049public class SyncReplicationActiveTestBase extends SyncReplicationTestBase {
050
051  @Test
052  public void testActive() throws Exception {
053    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
054      SyncReplicationState.STANDBY);
055    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
056      SyncReplicationState.ACTIVE);
057
058    // confirm that peer with state A will reject replication request.
059    verifyReplicationRequestRejection(UTIL1, true);
060    verifyReplicationRequestRejection(UTIL2, false);
061
062    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
063    write(UTIL1, 0, 100);
064    Thread.sleep(2000);
065    // peer is disabled so no data have been replicated
066    verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
067
068    // Ensure that there's no cluster id in remote log entries.
069    verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID);
070
071    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
072      SyncReplicationState.DOWNGRADE_ACTIVE);
073    // confirm that peer with state DA will reject replication request.
074    verifyReplicationRequestRejection(UTIL2, true);
075    // confirm that the data is there after we convert the peer to DA
076    verify(UTIL2, 0, 100);
077
078    try (AsyncConnection conn =
079      ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
080      AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
081      CompletableFuture<Void> future =
082        table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000)));
083      Thread.sleep(2000);
084      // should hang on rolling
085      assertFalse(future.isDone());
086      UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
087        SyncReplicationState.STANDBY);
088      try {
089        future.get();
090        fail("should fail because of the wal is closing");
091      } catch (ExecutionException e) {
092        // expected
093        assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed"));
094      }
095    }
096    // confirm that the data has not been persisted
097    HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
098    assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty());
099    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
100      SyncReplicationState.ACTIVE);
101
102    writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
103
104    // shutdown the cluster completely
105    UTIL1.shutdownMiniCluster();
106    // confirm that we can convert to DA even if the remote slave cluster is down
107    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
108      SyncReplicationState.DOWNGRADE_ACTIVE);
109    // confirm that peer with state DA will reject replication request.
110    verifyReplicationRequestRejection(UTIL2, true);
111    write(UTIL2, 200, 300);
112  }
113
114  private void verifyNoClusterIdInRemoteLog(HBaseTestingUtil utility, Path remoteDir, String peerId)
115    throws Exception {
116    FileSystem fs2 = utility.getTestFileSystem();
117    FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
118    Assert.assertTrue(files.length > 0);
119    for (FileStatus file : files) {
120      try (WALStreamReader reader =
121        NoEOFWALStreamReader.create(fs2, file.getPath(), utility.getConfiguration())) {
122        Entry entry = reader.next();
123        Assert.assertTrue(entry != null);
124        while (entry != null) {
125          Assert.assertEquals(entry.getKey().getClusterIds().size(), 0);
126          entry = reader.next();
127        }
128      }
129    }
130  }
131}