001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
025import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
026import org.apache.hadoop.hbase.procedure2.Procedure;
027import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
028import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
029import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
030import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
031import org.apache.hadoop.hbase.replication.ReplicationException;
032import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
033import org.apache.hadoop.hbase.util.RetryCounter;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueuesStateData;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
041
042/**
043 * Used to assign the replication queues of a dead server to other region servers.
044 */
045@InterfaceAudience.Private
046public class ClaimReplicationQueuesProcedure extends Procedure<MasterProcedureEnv>
047  implements ServerProcedureInterface {
048
049  private static final Logger LOG = LoggerFactory.getLogger(ClaimReplicationQueuesProcedure.class);
050
051  private ServerName crashedServer;
052
053  private RetryCounter retryCounter;
054
055  public ClaimReplicationQueuesProcedure() {
056  }
057
058  public ClaimReplicationQueuesProcedure(ServerName crashedServer) {
059    this.crashedServer = crashedServer;
060  }
061
062  @Override
063  public ServerName getServerName() {
064    return crashedServer;
065  }
066
067  @Override
068  public boolean hasMetaTableRegion() {
069    return false;
070  }
071
072  @Override
073  public ServerOperationType getServerOperationType() {
074    return ServerOperationType.CLAIM_REPLICATION_QUEUES;
075  }
076
077  @Override
078  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
079    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
080    ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
081    try {
082      List<String> queues = storage.getAllQueues(crashedServer);
083      if (queues.isEmpty()) {
084        LOG.debug("Finish claiming replication queues for {}", crashedServer);
085        storage.removeReplicatorIfQueueIsEmpty(crashedServer);
086        // we are done
087        return null;
088      }
089      LOG.debug("There are {} replication queues need to be claimed for {}", queues.size(),
090        crashedServer);
091      List<ServerName> targetServers =
092        env.getMasterServices().getServerManager().getOnlineServersList();
093      if (targetServers.isEmpty()) {
094        throw new ReplicationException("no region server available");
095      }
096      Collections.shuffle(targetServers);
097      ClaimReplicationQueueRemoteProcedure[] procs =
098        new ClaimReplicationQueueRemoteProcedure[Math.min(queues.size(), targetServers.size())];
099      for (int i = 0; i < procs.length; i++) {
100        procs[i] = new ClaimReplicationQueueRemoteProcedure(crashedServer, queues.get(i),
101          targetServers.get(i));
102      }
103      return procs;
104    } catch (ReplicationException e) {
105      if (retryCounter == null) {
106        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
107      }
108      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
109      LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; {};", crashedServer,
110        backoff / 1000, e);
111      setTimeout(Math.toIntExact(backoff));
112      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
113      skipPersistence();
114      throw new ProcedureSuspendedException();
115    }
116  }
117
118  @Override
119  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
120    setState(ProcedureProtos.ProcedureState.RUNNABLE);
121    env.getProcedureScheduler().addFront(this);
122    return false;
123  }
124
125  @Override
126  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
127    throw new UnsupportedOperationException();
128  }
129
130  @Override
131  protected boolean abort(MasterProcedureEnv env) {
132    return false;
133  }
134
135  @Override
136  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
137    serializer.serialize(ClaimReplicationQueuesStateData.newBuilder()
138      .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).build());
139  }
140
141  @Override
142  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
143    ClaimReplicationQueuesStateData data =
144      serializer.deserialize(ClaimReplicationQueuesStateData.class);
145    crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
146  }
147}