001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.Collections; 023import java.util.List; 024import java.util.stream.Collectors; 025import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 026import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 028import org.apache.hadoop.hbase.procedure2.Procedure; 029import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 032import org.apache.hadoop.hbase.replication.ReplicationException; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RemovePeerStateData; 039 040/** 041 * The procedure for removing a replication peer. 042 */ 043@InterfaceAudience.Private 044public class RemovePeerProcedure extends ModifyPeerProcedure { 045 046 private static final Logger LOG = LoggerFactory.getLogger(RemovePeerProcedure.class); 047 048 private ReplicationPeerConfig peerConfig; 049 050 private List<Long> ongoingAssignReplicationQueuesProcIds = Collections.emptyList(); 051 052 public RemovePeerProcedure() { 053 } 054 055 public RemovePeerProcedure(String peerId) { 056 super(peerId); 057 } 058 059 @Override 060 public PeerOperationType getPeerOperationType() { 061 return PeerOperationType.REMOVE; 062 } 063 064 @Override 065 protected void prePeerModification(MasterProcedureEnv env) throws IOException { 066 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 067 if (cpHost != null) { 068 cpHost.preRemoveReplicationPeer(peerId); 069 } 070 peerConfig = env.getReplicationPeerManager().preRemovePeer(peerId); 071 } 072 073 @Override 074 protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { 075 env.getReplicationPeerManager().removePeer(peerId); 076 // record ongoing AssignReplicationQueuesProcedures after we update the peer storage 077 ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor() 078 .getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure) 079 .filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList()); 080 } 081 082 private void removeRemoteWALs(MasterProcedureEnv env) throws IOException { 083 env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId); 084 } 085 086 private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env) 087 throws ProcedureSuspendedException { 088 if (ongoingAssignReplicationQueuesProcIds.isEmpty()) { 089 LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on", 090 peerId); 091 } 092 ProcedureExecutor<MasterProcedureEnv> procExec = 093 env.getMasterServices().getMasterProcedureExecutor(); 094 long[] unfinishedProcIds = 095 ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure) 096 .filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray(); 097 if (unfinishedProcIds.length == 0) { 098 LOG.info( 099 "All assign replication queues procedures are finished when removing peer {}, move on", 100 peerId); 101 } else { 102 throw suspend(env.getMasterConfiguration(), backoff -> LOG.info( 103 "There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs", 104 unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000)); 105 } 106 } 107 108 @Override 109 protected void postPeerModification(MasterProcedureEnv env) 110 throws IOException, ReplicationException, ProcedureSuspendedException { 111 checkAssignReplicationQueuesFinished(env); 112 113 if (peerConfig.isSyncReplication()) { 114 removeRemoteWALs(env); 115 } 116 env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId); 117 if (peerConfig.isSerial()) { 118 env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId); 119 } 120 LOG.info("Successfully removed peer {}", peerId); 121 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 122 if (cpHost != null) { 123 cpHost.postRemoveReplicationPeer(peerId); 124 } 125 } 126 127 @Override 128 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 129 super.serializeStateData(serializer); 130 RemovePeerStateData.Builder builder = RemovePeerStateData.newBuilder(); 131 if (peerConfig != null) { 132 builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); 133 } 134 builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds); 135 serializer.serialize(builder.build()); 136 } 137 138 @Override 139 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 140 super.deserializeStateData(serializer); 141 RemovePeerStateData data = serializer.deserialize(RemovePeerStateData.class); 142 if (data.hasPeerConfig()) { 143 this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); 144 } 145 ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList(); 146 } 147}