001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Optional;
022import org.apache.hadoop.hbase.ServerName;
023import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
024import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
025import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
026import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
027import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
028import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
029import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
030import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
031import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
041
042@InterfaceAudience.Private
043public class RefreshPeerProcedure extends ServerRemoteProcedure
044  implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
045
046  private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerProcedure.class);
047
048  private String peerId;
049
050  private PeerOperationType type;
051
052  private int stage;
053
054  public RefreshPeerProcedure() {
055  }
056
057  public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) {
058    this(peerId, type, targetServer, 0);
059  }
060
061  public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer,
062    int stage) {
063    this.peerId = peerId;
064    this.type = type;
065    this.targetServer = targetServer;
066    this.stage = stage;
067  }
068
069  @Override
070  public String getPeerId() {
071    return peerId;
072  }
073
074  @Override
075  public PeerOperationType getPeerOperationType() {
076    return PeerOperationType.REFRESH;
077  }
078
079  private static PeerModificationType toPeerModificationType(PeerOperationType type) {
080    switch (type) {
081      case ADD:
082        return PeerModificationType.ADD_PEER;
083      case REMOVE:
084        return PeerModificationType.REMOVE_PEER;
085      case ENABLE:
086        return PeerModificationType.ENABLE_PEER;
087      case DISABLE:
088        return PeerModificationType.DISABLE_PEER;
089      case UPDATE_CONFIG:
090        return PeerModificationType.UPDATE_PEER_CONFIG;
091      case TRANSIT_SYNC_REPLICATION_STATE:
092        return PeerModificationType.TRANSIT_SYNC_REPLICATION_STATE;
093      default:
094        throw new IllegalArgumentException("Unknown type: " + type);
095    }
096  }
097
098  private static PeerOperationType toPeerOperationType(PeerModificationType type) {
099    switch (type) {
100      case ADD_PEER:
101        return PeerOperationType.ADD;
102      case REMOVE_PEER:
103        return PeerOperationType.REMOVE;
104      case ENABLE_PEER:
105        return PeerOperationType.ENABLE;
106      case DISABLE_PEER:
107        return PeerOperationType.DISABLE;
108      case UPDATE_PEER_CONFIG:
109        return PeerOperationType.UPDATE_CONFIG;
110      case TRANSIT_SYNC_REPLICATION_STATE:
111        return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
112      default:
113        throw new IllegalArgumentException("Unknown type: " + type);
114    }
115  }
116
117  @Override
118  public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
119    assert targetServer.equals(remote);
120    return Optional.of(new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
121      RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
122        .setTargetServer(ProtobufUtil.toServerName(remote)).setStage(stage).build().toByteArray(),
123      env.getMasterServices().getMasterActiveTime()));
124  }
125
126  @Override
127  protected boolean complete(MasterProcedureEnv env, Throwable error) {
128    if (error != null) {
129      LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
130      return false;
131    } else {
132      LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
133      return true;
134    }
135  }
136
137  @Override
138  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
139    throw new UnsupportedOperationException();
140  }
141
142  @Override
143  protected boolean abort(MasterProcedureEnv env) {
144    // TODO: no correctness problem if we just ignore this, implement later.
145    return false;
146  }
147
148  @Override
149  protected boolean waitInitialized(MasterProcedureEnv env) {
150    return env.waitInitialized(this);
151  }
152
153  @Override
154  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
155    RefreshPeerStateData.Builder builder = RefreshPeerStateData.newBuilder();
156    if (this.remoteError != null) {
157      ErrorHandlingProtos.ForeignExceptionMessage fem =
158        ForeignExceptionUtil.toProtoForeignException(remoteError);
159      builder.setError(fem);
160    }
161    serializer.serialize(builder.setPeerId(peerId).setType(toPeerModificationType(type))
162      .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).setState(state)
163      .build());
164  }
165
166  @Override
167  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
168    RefreshPeerStateData data = serializer.deserialize(RefreshPeerStateData.class);
169    peerId = data.getPeerId();
170    type = toPeerOperationType(data.getType());
171    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
172    stage = data.getStage();
173    state = data.getState();
174    if (data.hasError()) {
175      this.remoteError = ForeignExceptionUtil.toException(data.getError());
176    }
177  }
178}