001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 025import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 026import org.apache.hadoop.hbase.procedure2.Procedure; 027import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 028import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 029import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 030import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 031import org.apache.hadoop.hbase.replication.ReplicationException; 032import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 033import org.apache.hadoop.hbase.util.RetryCounter; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 041 042/** 043 * Used to assign the replication queues of a dead server to other region servers. 044 */ 045@InterfaceAudience.Private 046public class ClaimReplicationQueuesProcedure extends Procedure<MasterProcedureEnv> 047 implements ServerProcedureInterface { 048 049 private static final Logger LOG = LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class); 050 051 private ServerName crashedServer; 052 053 private RetryCounter retryCounter; 054 055 public ClaimReplicationQueuesProcedure() { 056 } 057 058 public ClaimReplicationQueuesProcedure(ServerName crashedServer) { 059 this.crashedServer = crashedServer; 060 } 061 062 @Override 063 public ServerName getServerName() { 064 return crashedServer; 065 } 066 067 @Override 068 public boolean hasMetaTableRegion() { 069 return false; 070 } 071 072 @Override 073 public ServerOperationType getServerOperationType() { 074 return ServerOperationType.CLAIM_REPLICATION_QUEUES; 075 } 076 077 @Override 078 protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 079 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 080 ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); 081 try { 082 List<String> queues = storage.getAllQueues(crashedServer); 083 if (queues.isEmpty()) { 084 LOG.debug("Finish claiming replication queues for {}", crashedServer); 085 storage.removeReplicatorIfQueueIsEmpty(crashedServer); 086 // we are done 087 return null; 088 } 089 LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(), 090 crashedServer); 091 List<ServerName> targetServers = 092 env.getMasterServices().getServerManager().getOnlineServersList(); 093 if (targetServers.isEmpty()) { 094 throw new ReplicationException("no region server available"); 095 } 096 Collections.shuffle(targetServers); 097 ClaimReplicationQueueRemoteProcedure[] procs = 098 new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(), targetServers.size())]; 099 for (int i = 0; i < procs.length; i++) { 100 procs[i] = new ClaimReplicationQueueRemoteProcedure(crashedServer, queues.get(i), 101 targetServers.get(i)); 102 } 103 return procs; 104 } catch (ReplicationException e) { 105 if (retryCounter == null) { 106 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 107 } 108 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 109 LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; {};", crashedServer, 110 backoff / 1000, e); 111 setTimeout(Math.toIntExact(backoff)); 112 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 113 skipPersistence(); 114 throw new ProcedureSuspendedException(); 115 } 116 } 117 118 @Override 119 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 120 setState(ProcedureProtos.ProcedureState.RUNNABLE); 121 env.getProcedureScheduler().addFront(this); 122 return false; 123 } 124 125 @Override 126 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 127 throw new UnsupportedOperationException(); 128 } 129 130 @Override 131 protected boolean abort(MasterProcedureEnv env) { 132 return false; 133 } 134 135 @Override 136 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 137 serializer.serialize(ClaimReplicationQueuesStateData.newBuilder() 138 .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build()); 139 } 140 141 @Override 142 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 143 ClaimReplicationQueuesStateData data = 144 serializer.deserialize(ClaimReplicationQueuesStateData.class); 145 crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); 146 } 147}