001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState.DISPATCH_WALS_VALUE; 021import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE_VALUE; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.master.HMaster; 029import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 030import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 031import org.apache.hadoop.hbase.replication.SyncReplicationState; 032import org.apache.hadoop.hbase.replication.SyncReplicationTestBase; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 036import org.junit.BeforeClass; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040 041/** 042 * Testcase for HBASE-21494. 043 */ 044@Category({ MasterTests.class, LargeTests.class }) 045public class TestRegisterPeerWorkerWhenRestarting extends SyncReplicationTestBase { 046 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestRegisterPeerWorkerWhenRestarting.class); 050 051 private static volatile boolean FAIL = false; 052 053 public static final class HMasterForTest extends HMaster { 054 055 public HMasterForTest(Configuration conf) throws IOException { 056 super(conf); 057 } 058 059 @Override 060 public void remoteProcedureCompleted(long procId) { 061 if ( 062 FAIL && getMasterProcedureExecutor() 063 .getProcedure(procId) instanceof SyncReplicationReplayWALRemoteProcedure 064 ) { 065 throw new RuntimeException("Inject error"); 066 } 067 super.remoteProcedureCompleted(procId); 068 } 069 } 070 071 @BeforeClass 072 public static void setUp() throws Exception { 073 UTIL2.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); 074 SyncReplicationTestBase.setUp(); 075 } 076 077 @Test 078 public void testRestart() throws Exception { 079 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 080 SyncReplicationState.STANDBY); 081 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 082 SyncReplicationState.ACTIVE); 083 084 UTIL1.getAdmin().disableReplicationPeer(PEER_ID); 085 write(UTIL1, 0, 100); 086 Thread.sleep(2000); 087 // peer is disabled so no data have been replicated 088 verifyNotReplicatedThroughRegion(UTIL2, 0, 100); 089 090 // transit the A to DA first to avoid too many error logs. 091 UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 092 SyncReplicationState.DOWNGRADE_ACTIVE); 093 HMaster master = UTIL2.getHBaseCluster().getMaster(); 094 // make sure the transiting can not succeed 095 FAIL = true; 096 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 097 Thread t = new Thread() { 098 099 @Override 100 public void run() { 101 try { 102 UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, 103 SyncReplicationState.DOWNGRADE_ACTIVE); 104 } catch (IOException e) { 105 throw new UncheckedIOException(e); 106 } 107 } 108 }; 109 t.start(); 110 // wait until we are in the states where we need to register peer worker when restarting 111 UTIL2.waitFor(60000, 112 () -> procExec.getProcedures().stream().filter(p -> p instanceof RecoverStandbyProcedure) 113 .map(p -> (RecoverStandbyProcedure) p) 114 .anyMatch(p -> p.getCurrentStateId() == DISPATCH_WALS_VALUE 115 || p.getCurrentStateId() == UNREGISTER_PEER_FROM_WORKER_STORAGE_VALUE)); 116 // failover to another master 117 MasterThread mt = UTIL2.getMiniHBaseCluster().getMasterThread(); 118 mt.getMaster().abort("for testing"); 119 mt.join(); 120 FAIL = false; 121 t.join(); 122 // make sure the new master can finish the transition 123 UTIL2.waitFor(60000, () -> UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID) 124 == SyncReplicationState.DOWNGRADE_ACTIVE); 125 verify(UTIL2, 0, 100); 126 } 127}