001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Optional;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
026import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
027import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
028import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
030import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
031import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable;
032import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData;
041
042/**
043 * A remote procedure which is used to send replaying remote wal work to region server.
044 */
045@InterfaceAudience.Private
046public class SyncReplicationReplayWALRemoteProcedure extends ServerRemoteProcedure
047  implements PeerProcedureInterface {
048
049  private static final Logger LOG =
050    LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class);
051
052  private String peerId;
053
054  private List<String> wals;
055
056  public SyncReplicationReplayWALRemoteProcedure() {
057  }
058
059  public SyncReplicationReplayWALRemoteProcedure(String peerId, List<String> wals,
060    ServerName targetServer) {
061    this.peerId = peerId;
062    this.wals = wals;
063    this.targetServer = targetServer;
064  }
065
066  @Override
067  public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
068    ReplaySyncReplicationWALParameter.Builder builder =
069      ReplaySyncReplicationWALParameter.newBuilder();
070    builder.setPeerId(peerId);
071    wals.stream().forEach(builder::addWal);
072    return Optional
073      .of(new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
074        builder.build().toByteArray(), env.getMasterServices().getMasterActiveTime()));
075  }
076
077  protected boolean complete(MasterProcedureEnv env, Throwable error) {
078    if (error != null) {
079      LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error);
080      return false;
081    } else {
082      truncateWALs(env);
083      LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId);
084      return true;
085    }
086  }
087
088  /**
089   * Only truncate wals one by one when task succeed. The parent procedure will check the first wal
090   * length to know whether this task succeed.
091   */
092  private void truncateWALs(MasterProcedureEnv env) {
093    String firstWal = wals.get(0);
094    try {
095      env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(firstWal);
096    } catch (IOException e) {
097      // As it is idempotent to rerun this task. Just ignore this exception and return.
098      LOG.warn("Failed to truncate wal {} for peer id={}", firstWal, peerId, e);
099      return;
100    }
101    for (int i = 1; i < wals.size(); i++) {
102      String wal = wals.get(i);
103      try {
104        env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal);
105      } catch (IOException e1) {
106        try {
107          // retry
108          env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal);
109        } catch (IOException e2) {
110          // As the parent procedure only check the first wal length. Just ignore this exception.
111          LOG.warn("Failed to truncate wal {} for peer id={}", wal, peerId, e2);
112        }
113      }
114    }
115  }
116
117  @Override
118  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
119    throw new UnsupportedOperationException();
120  }
121
122  @Override
123  protected boolean abort(MasterProcedureEnv env) {
124    return false;
125  }
126
127  @Override
128  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
129    SyncReplicationReplayWALRemoteStateData.Builder builder =
130      SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId)
131        .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state);
132    wals.stream().forEach(builder::addWal);
133    if (this.remoteError != null) {
134      ErrorHandlingProtos.ForeignExceptionMessage fem =
135        ForeignExceptionUtil.toProtoForeignException(remoteError);
136      builder.setError(fem);
137    }
138    serializer.serialize(builder.build());
139  }
140
141  @Override
142  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
143    SyncReplicationReplayWALRemoteStateData data =
144      serializer.deserialize(SyncReplicationReplayWALRemoteStateData.class);
145    peerId = data.getPeerId();
146    wals = new ArrayList<>();
147    data.getWalList().forEach(wals::add);
148    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
149    state = data.getState();
150    if (data.hasError()) {
151      this.remoteError = ForeignExceptionUtil.toException(data.getError());
152    }
153  }
154
155  @Override
156  public String getPeerId() {
157    return peerId;
158  }
159
160  @Override
161  public PeerOperationType getPeerOperationType() {
162    return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE;
163  }
164}