001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 025import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 026import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 027import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData; 034 035/** 036 * The procedure for replaying all the remote wals for transitting a sync replication peer from 037 * STANDBY to DOWNGRADE_ACTIVE. 038 */ 039@InterfaceAudience.Private 040public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure<RecoverStandbyState> { 041 042 private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class); 043 044 private boolean serial; 045 046 public RecoverStandbyProcedure() { 047 } 048 049 public RecoverStandbyProcedure(String peerId, boolean serial) { 050 super(peerId); 051 this.serial = serial; 052 } 053 054 @Override 055 protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state) 056 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 057 SyncReplicationReplayWALManager syncReplicationReplayWALManager = 058 env.getMasterServices().getSyncReplicationReplayWALManager(); 059 switch (state) { 060 case RENAME_SYNC_REPLICATION_WALS_DIR: 061 try { 062 syncReplicationReplayWALManager.renameToPeerReplayWALDir(peerId); 063 } catch (IOException e) { 064 LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e); 065 setFailure("master-recover-standby", e); 066 return Flow.NO_MORE_STATE; 067 } 068 setNextState(RecoverStandbyState.REGISTER_PEER_TO_WORKER_STORAGE); 069 return Flow.HAS_MORE_STATE; 070 case REGISTER_PEER_TO_WORKER_STORAGE: 071 syncReplicationReplayWALManager.registerPeer(peerId); 072 setNextState(RecoverStandbyState.DISPATCH_WALS); 073 return Flow.HAS_MORE_STATE; 074 case DISPATCH_WALS: 075 dispathWals(syncReplicationReplayWALManager); 076 setNextState(RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE); 077 return Flow.HAS_MORE_STATE; 078 case UNREGISTER_PEER_FROM_WORKER_STORAGE: 079 syncReplicationReplayWALManager.unregisterPeer(peerId); 080 setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR); 081 return Flow.HAS_MORE_STATE; 082 case SNAPSHOT_SYNC_REPLICATION_WALS_DIR: 083 try { 084 syncReplicationReplayWALManager.renameToPeerSnapshotWALDir(peerId); 085 } catch (IOException e) { 086 LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e); 087 throw new ProcedureYieldException(); 088 } 089 return Flow.NO_MORE_STATE; 090 default: 091 throw new UnsupportedOperationException("unhandled state=" + state); 092 } 093 } 094 095 // TODO: dispatch wals by region server when serial is true and sort wals 096 private void dispathWals(SyncReplicationReplayWALManager syncReplicationReplayWALManager) 097 throws ProcedureYieldException { 098 try { 099 List<Path> wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId); 100 addChildProcedure(wals.stream() 101 .map(wal -> new SyncReplicationReplayWALProcedure(peerId, 102 Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal)))) 103 .toArray(SyncReplicationReplayWALProcedure[]::new)); 104 } catch (IOException e) { 105 LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e); 106 throw new ProcedureYieldException(); 107 } 108 } 109 110 @Override 111 protected RecoverStandbyState getState(int stateId) { 112 return RecoverStandbyState.forNumber(stateId); 113 } 114 115 @Override 116 protected int getStateId(RecoverStandbyState state) { 117 return state.getNumber(); 118 } 119 120 @Override 121 protected RecoverStandbyState getInitialState() { 122 return RecoverStandbyState.RENAME_SYNC_REPLICATION_WALS_DIR; 123 } 124 125 @Override 126 public PeerOperationType getPeerOperationType() { 127 return PeerOperationType.RECOVER_STANDBY; 128 } 129 130 @Override 131 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 132 super.serializeStateData(serializer); 133 serializer.serialize(RecoverStandbyStateData.newBuilder().setSerial(serial).build()); 134 } 135 136 @Override 137 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 138 super.deserializeStateData(serializer); 139 RecoverStandbyStateData data = serializer.deserialize(RecoverStandbyStateData.class); 140 serial = data.getSerial(); 141 } 142 143 @Override 144 protected void afterReplay(MasterProcedureEnv env) { 145 // For these two states, we need to register the peer to the replay manager, as the state are 146 // only kept in memory and will be lost after restarting. And in 147 // SyncReplicationReplayWALProcedure.afterReplay we will reconstruct the used workers. 148 switch (getCurrentState()) { 149 case DISPATCH_WALS: 150 case UNREGISTER_PEER_FROM_WORKER_STORAGE: 151 env.getMasterServices().getSyncReplicationReplayWALManager().registerPeer(peerId); 152 break; 153 default: 154 break; 155 } 156 } 157}