001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ExecutionException; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.client.AsyncConnection; 033import org.apache.hadoop.hbase.client.AsyncTable; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.regionserver.HRegion; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 042import org.apache.hadoop.hbase.wal.WAL.Entry; 043import org.apache.hadoop.hbase.wal.WALStreamReader; 044import org.junit.Assert; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047 048@Category({ ReplicationTests.class, LargeTests.class }) 049public class SyncReplicationActiveTestBase extends SyncReplicationTestBase { 050 051 @Test 052 public void testActive() throws Exception { 053 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 054 SyncReplicationState.STANDBY); 055 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 056 SyncReplicationState.ACTIVE); 057 058 // confirm that peer with state A will reject replication request. 059 verifyReplicationRequestRejection(UTIL1, true); 060 verifyReplicationRequestRejection(UTIL2, false); 061 062 UTIL1.getAdmin().disableReplicationPeer(PEER_ID); 063 write(UTIL1, 0, 100); 064 Thread.sleep(2000); 065 // peer is disabled so no data have been replicated 066 verifyNotReplicatedThroughRegion(UTIL2, 0, 100); 067 068 // Ensure that there's no cluster id in remote log entries. 069 verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID); 070 071 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 072 SyncReplicationState.DOWNGRADE_ACTIVE); 073 // confirm that peer with state DA will reject replication request. 074 verifyReplicationRequestRejection(UTIL2, true); 075 // confirm that the data is there after we convert the peer to DA 076 verify(UTIL2, 0, 100); 077 078 try (AsyncConnection conn = 079 ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) { 080 AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build(); 081 CompletableFuture<Void> future = 082 table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000))); 083 Thread.sleep(2000); 084 // should hang on rolling 085 assertFalse(future.isDone()); 086 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 087 SyncReplicationState.STANDBY); 088 try { 089 future.get(); 090 fail("should fail because of the wal is closing"); 091 } catch (ExecutionException e) { 092 // expected 093 assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed")); 094 } 095 } 096 // confirm that the data has not been persisted 097 HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); 098 assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty()); 099 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 100 SyncReplicationState.ACTIVE); 101 102 writeAndVerifyReplication(UTIL2, UTIL1, 100, 200); 103 104 // shutdown the cluster completely 105 UTIL1.shutdownMiniCluster(); 106 // confirm that we can convert to DA even if the remote slave cluster is down 107 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 108 SyncReplicationState.DOWNGRADE_ACTIVE); 109 // confirm that peer with state DA will reject replication request. 110 verifyReplicationRequestRejection(UTIL2, true); 111 write(UTIL2, 200, 300); 112 } 113 114 private void verifyNoClusterIdInRemoteLog(HBaseTestingUtil utility, Path remoteDir, String peerId) 115 throws Exception { 116 FileSystem fs2 = utility.getTestFileSystem(); 117 FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId)); 118 Assert.assertTrue(files.length > 0); 119 for (FileStatus file : files) { 120 try (WALStreamReader reader = 121 NoEOFWALStreamReader.create(fs2, file.getPath(), utility.getConfiguration())) { 122 Entry entry = reader.next(); 123 Assert.assertTrue(entry != null); 124 while (entry != null) { 125 Assert.assertEquals(entry.getKey().getClusterIds().size(), 0); 126 entry = reader.next(); 127 } 128 } 129 } 130 } 131}