001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Optional;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.master.MasterFileSystem;
025import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
026import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
027import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
028import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
029import org.apache.hadoop.hbase.procedure2.Procedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
032import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
033import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
034import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
035import org.apache.hadoop.hbase.replication.ReplicationQueueId;
036import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
038import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData;
047
048@InterfaceAudience.Private
049public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure
050  implements ServerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
051
052  private static final Logger LOG =
053    LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class);
054
055  private ReplicationQueueId queueId;
056
057  public ClaimReplicationQueueRemoteProcedure() {
058  }
059
060  public ClaimReplicationQueueRemoteProcedure(ReplicationQueueId queueId, ServerName targetServer) {
061    this.queueId = queueId;
062    this.targetServer = targetServer;
063  }
064
065  // check whether ReplicationSyncUp has already done the work for us, if so, we should skip
066  // claiming the replication queues and deleting them instead.
067  private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
068    MasterFileSystem mfs = env.getMasterFileSystem();
069    Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
070    return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName()));
071  }
072
073  @Override
074  protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
075    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
076    try {
077      if (shouldSkip(env)) {
078        LOG.info("Skip claiming {} because replication sync up has already done it for us",
079          getServerName());
080        return null;
081      }
082    } catch (IOException e) {
083      LOG.warn("failed to check whether we should skip claiming {} due to replication sync up",
084        getServerName(), e);
085      // just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule
086      return null;
087    }
088    return super.execute(env);
089  }
090
091  @Override
092  public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
093    assert targetServer.equals(remote);
094    ClaimReplicationQueueRemoteParameter.Builder builder = ClaimReplicationQueueRemoteParameter
095      .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName()))
096      .setQueue(queueId.getPeerId());
097    queueId.getSourceServerName()
098      .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer)));
099    return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class,
100      builder.build().toByteArray(), env.getMasterServices().getMasterActiveTime()));
101  }
102
103  @Override
104  public ServerName getServerName() {
105    // return crashed server here, as we are going to recover its replication queues so we should
106    // use its scheduler queue instead of the one for the target server.
107    return queueId.getServerName();
108  }
109
110  @Override
111  public boolean hasMetaTableRegion() {
112    return false;
113  }
114
115  @Override
116  public ServerOperationType getServerOperationType() {
117    return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE;
118  }
119
120  @Override
121  protected boolean complete(MasterProcedureEnv env, Throwable error) {
122    if (error != null) {
123      LOG.warn("Failed to claim replication queue {} on server {} ", queueId, targetServer, error);
124      return false;
125    } else {
126      return true;
127    }
128  }
129
130  @Override
131  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
132    throw new UnsupportedOperationException();
133  }
134
135  @Override
136  protected boolean abort(MasterProcedureEnv env) {
137    return false;
138  }
139
140  @Override
141  protected boolean waitInitialized(MasterProcedureEnv env) {
142    return env.waitInitialized(this);
143  }
144
145  @Override
146  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
147    ClaimReplicationQueueRemoteStateData.Builder builder = ClaimReplicationQueueRemoteStateData
148      .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName()))
149      .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer))
150      .setState(state);
151    if (this.remoteError != null) {
152      ErrorHandlingProtos.ForeignExceptionMessage fem =
153        ForeignExceptionUtil.toProtoForeignException(remoteError);
154      builder.setError(fem);
155    }
156    queueId.getSourceServerName()
157      .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer)));
158    serializer.serialize(builder.build());
159  }
160
161  @Override
162  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
163    ClaimReplicationQueueRemoteStateData data =
164      serializer.deserialize(ClaimReplicationQueueRemoteStateData.class);
165    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
166    ServerName crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
167    String queue = data.getQueue();
168    state = data.getState();
169    if (data.hasSourceServer()) {
170      queueId = new ReplicationQueueId(crashedServer, queue,
171        ProtobufUtil.toServerName(data.getSourceServer()));
172    } else {
173      queueId = new ReplicationQueueId(crashedServer, queue);
174    }
175    if (data.hasError()) {
176      this.remoteError = ForeignExceptionUtil.toException(data.getError());
177    }
178  }
179}