001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.Optional;
022import java.util.concurrent.CompletableFuture;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.RegionInfo;
027import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
028import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
029import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
030import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
031import org.apache.hadoop.hbase.procedure2.Procedure;
032import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
033import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
036import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
037import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
039import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
040import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
041import org.apache.hadoop.hbase.util.RetryCounter;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
052
053/**
054 * The base class for the remote procedures used to open/close a region.
055 * <p/>
056 * Notice that here we do not care about the result of the remote call, if the remote call is
057 * finished, either succeeded or not, we will always finish the procedure. The parent procedure
058 * should take care of the result and try to reschedule if the result is not good.
059 */
060@InterfaceAudience.Private
061public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedureEnv>
062  implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
063
064  private static final Logger LOG = LoggerFactory.getLogger(RegionRemoteProcedureBase.class);
065
066  protected RegionInfo region;
067
068  protected ServerName targetServer;
069
070  private RegionRemoteProcedureBaseState state =
071    RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
072
073  private TransitionCode transitionCode;
074
075  private long seqId;
076
077  private RetryCounter retryCounter;
078
079  private CompletableFuture<Void> future;
080
081  protected RegionRemoteProcedureBase() {
082  }
083
084  protected RegionRemoteProcedureBase(TransitRegionStateProcedure parent, RegionInfo region,
085    ServerName targetServer) {
086    this.region = region;
087    this.targetServer = targetServer;
088    parent.attachRemoteProc(this);
089  }
090
091  @Override
092  public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
093    ServerName remote) {
094    // REPORT_SUCCEED means that this remote open/close request already executed in RegionServer.
095    // So return empty operation and RSProcedureDispatcher no need to send it again.
096    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
097      return Optional.empty();
098    }
099    return Optional.of(newRemoteOperation(env));
100  }
101
102  protected abstract RemoteProcedureDispatcher.RemoteOperation
103    newRemoteOperation(MasterProcedureEnv env);
104
105  @Override
106  public void remoteOperationCompleted(MasterProcedureEnv env) {
107    // should not be called since we use reportRegionStateTransition to report the result
108    throw new UnsupportedOperationException();
109  }
110
111  @Override
112  public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
113    // should not be called since we use reportRegionStateTransition to report the result
114    throw new UnsupportedOperationException();
115  }
116
117  private RegionStateNode getRegionNode(MasterProcedureEnv env) {
118    return env.getAssignmentManager().getRegionStates().getRegionStateNode(region);
119  }
120
121  @Override
122  public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
123    RegionStateNode regionNode = getRegionNode(env);
124    regionNode.lock();
125    try {
126      if (!env.getMasterServices().getServerManager().isServerOnline(remote)) {
127        // the SCP will interrupt us, give up
128        LOG.debug("{} for region {}, targetServer {} is dead, SCP will interrupt us, give up", this,
129          regionNode, remote);
130        return;
131      }
132      if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
133        // not sure how can this happen but anyway let's add a check here to avoid waking the wrong
134        // procedure...
135        LOG.warn("{} for region {}, targetServer={} has already been woken up, ignore", this,
136          regionNode, remote);
137        return;
138      }
139      LOG.warn("The remote operation {} for region {} to server {} failed", this, regionNode,
140        remote, exception);
141      // It is OK to not persist the state here, as we do not need to change the region state if the
142      // remote call is failed. If the master crashed before we actually execute the procedure and
143      // persist the new state, it is fine to retry on the same target server again.
144      state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH_FAIL;
145      regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
146    } finally {
147      regionNode.unlock();
148    }
149  }
150
151  @Override
152  public TableName getTableName() {
153    return region.getTable();
154  }
155
156  @Override
157  protected boolean waitInitialized(MasterProcedureEnv env) {
158    if (TableName.isMetaTableName(getTableName())) {
159      return false;
160    }
161    // First we need meta to be loaded, and second, if meta is not online then we will likely to
162    // fail when updating meta so we wait until it is assigned.
163    AssignmentManager am = env.getAssignmentManager();
164    return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, region);
165  }
166
167  @Override
168  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
169    throw new UnsupportedOperationException();
170  }
171
172  @Override
173  protected boolean abort(MasterProcedureEnv env) {
174    return false;
175  }
176
177  // do some checks to see if the report is valid
178  protected abstract void checkTransition(RegionStateNode regionNode, TransitionCode transitionCode,
179    long seqId) throws UnexpectedStateException;
180
181  // change the in memory state of the regionNode, but do not update meta.
182  protected abstract void updateTransitionWithoutPersistingToMeta(MasterProcedureEnv env,
183    RegionStateNode regionNode, TransitionCode transitionCode, long seqId) throws IOException;
184
185  // A bit strange but the procedure store will throw RuntimeException if we can not persist the
186  // state, so upper layer should take care of this...
187  private void persistAndWake(MasterProcedureEnv env, RegionStateNode regionNode) {
188    // The synchronization here is to guard with ProcedureExecutor.executeRollback, as here we will
189    // not hold the procedure execution lock, but we should not persist a procedure in ROLLEDBACK
190    // state to the procedure store.
191    // The ProcedureStore.update must be inside the lock, so here the check for procedure state and
192    // update could be atomic. In ProcedureExecutor.cleanupAfterRollbackOneStep, we will set the
193    // state to ROLLEDBACK, which will hold the same lock too as the Procedure.setState method is
194    // synchronized. This is the key to keep us safe.
195    synchronized (this) {
196      if (getState() == ProcedureState.ROLLEDBACK) {
197        LOG.warn("Procedure {} has already been rolled back, skip persistent", this);
198        return;
199      }
200      env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
201    }
202    regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
203  }
204
205  // should be called with RegionStateNode locked, to avoid race with the execute method below
206  void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName,
207    TransitionCode transitionCode, long seqId) throws IOException {
208    if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
209      // should be a retry
210      return;
211    }
212    if (!targetServer.equals(serverName)) {
213      throw new UnexpectedStateException("Received report from " + serverName + ", expected "
214        + targetServer + ", " + regionNode + ", proc=" + this);
215    }
216    checkTransition(regionNode, transitionCode, seqId);
217    // this state means we have received the report from RS, does not mean the result is fine, as we
218    // may received a FAILED_OPEN.
219    this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
220    this.transitionCode = transitionCode;
221    this.seqId = seqId;
222    // Persist the transition code and openSeqNum(if provided).
223    // We should not update the hbase:meta directly as this may cause races when master restarts,
224    // as the old active master may incorrectly report back to RS and cause the new master to hang
225    // on a OpenRegionProcedure forever. See HBASE-22060 and HBASE-22074 for more details.
226    boolean succ = false;
227    try {
228      persistAndWake(env, regionNode);
229      succ = true;
230    } finally {
231      if (!succ) {
232        this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
233        this.transitionCode = null;
234        this.seqId = HConstants.NO_SEQNUM;
235      }
236    }
237    try {
238      updateTransitionWithoutPersistingToMeta(env, regionNode, transitionCode, seqId);
239    } catch (IOException e) {
240      throw new AssertionError("should not happen", e);
241    }
242  }
243
244  void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) {
245    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH) {
246      // should be a retry
247      return;
248    }
249    RegionRemoteProcedureBaseState oldState = state;
250    // it is possible that the state is in REGION_REMOTE_PROCEDURE_SERVER_CRASH, think of this
251    // sequence
252    // 1. region is open on the target server and the above reportTransition call is succeeded
253    // 2. before we are woken up and update the meta, the target server crashes, and then we arrive
254    // here
255    this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH;
256    boolean succ = false;
257    try {
258      persistAndWake(env, regionNode);
259      succ = true;
260    } finally {
261      if (!succ) {
262        this.state = oldState;
263      }
264    }
265  }
266
267  protected abstract void restoreSucceedState(AssignmentManager am, RegionStateNode regionNode,
268    long seqId) throws IOException;
269
270  void stateLoaded(AssignmentManager am, RegionStateNode regionNode) {
271    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
272      try {
273        restoreSucceedState(am, regionNode, seqId);
274      } catch (IOException e) {
275        // should not happen as we are just restoring the state
276        throw new AssertionError(e);
277      }
278    }
279  }
280
281  private TransitRegionStateProcedure getParent(MasterProcedureEnv env) {
282    return (TransitRegionStateProcedure) env.getMasterServices().getMasterProcedureExecutor()
283      .getProcedure(getParentProcId());
284  }
285
286  private void unattach(MasterProcedureEnv env) {
287    getParent(env).unattachRemoteProc(this);
288  }
289
290  private CompletableFuture<Void> getFuture() {
291    return future;
292  }
293
294  private void setFuture(CompletableFuture<Void> f) {
295    future = f;
296  }
297
298  @Override
299  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
300    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
301    RegionStateNode regionNode = getRegionNode(env);
302    if (!regionNode.isLockedBy(this)) {
303      regionNode.lock(this, () -> ProcedureFutureUtil.wakeUp(this, env));
304    }
305    try {
306      switch (state) {
307        case REGION_REMOTE_PROCEDURE_DISPATCH: {
308          // The code which wakes us up also needs to lock the RSN so here we do not need to
309          // synchronize on the event.
310          ProcedureEvent<?> event = regionNode.getProcedureEvent();
311          try {
312            env.getRemoteDispatcher().addOperationToNode(targetServer, this);
313          } catch (FailedRemoteDispatchException e) {
314            LOG.warn("Can not add remote operation {} for region {} to server {}, this usually "
315              + "because the server is alread dead, give up and mark the procedure as complete, "
316              + "the parent procedure will take care of this.", this, region, targetServer, e);
317            unattach(env);
318            return null;
319          }
320          event.suspend();
321          event.suspendIfNotReady(this);
322          throw new ProcedureSuspendedException();
323        }
324        case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
325          if (
326            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
327              () -> unattach(env))
328          ) {
329            return null;
330          }
331          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
332            env.getAssignmentManager().persistToMeta(regionNode), env, () -> unattach(env));
333          return null;
334        case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
335          // the remote call is failed so we do not need to change the region state, just return.
336          unattach(env);
337          return null;
338        case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
339          if (
340            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
341              () -> unattach(env))
342          ) {
343            return null;
344          }
345          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
346            env.getAssignmentManager().regionClosedAbnormally(regionNode), env,
347            () -> unattach(env));
348          return null;
349        default:
350          throw new IllegalStateException("Unknown state: " + state);
351      }
352    } catch (IOException e) {
353      if (retryCounter == null) {
354        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
355      }
356      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
357      LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
358      throw suspend(Math.toIntExact(backoff), true);
359    } finally {
360      if (future == null) {
361        regionNode.unlock(this);
362      }
363    }
364  }
365
366  @Override
367  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
368    setState(ProcedureProtos.ProcedureState.RUNNABLE);
369    env.getProcedureScheduler().addFront(this);
370    return false; // 'false' means that this procedure handled the timeout
371  }
372
373  @Override
374  public boolean storeInDispatchedQueue() {
375    return false;
376  }
377
378  @Override
379  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
380    RegionRemoteProcedureBaseStateData.Builder builder =
381      RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
382        .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state);
383    if (transitionCode != null) {
384      builder.setTransitionCode(transitionCode);
385      builder.setSeqId(seqId);
386    }
387    serializer.serialize(builder.build());
388  }
389
390  @Override
391  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
392    RegionRemoteProcedureBaseStateData data =
393      serializer.deserialize(RegionRemoteProcedureBaseStateData.class);
394    region = ProtobufUtil.toRegionInfo(data.getRegion());
395    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
396    // 'state' may not be present if we are reading an 'old' form of this pb Message.
397    if (data.hasState()) {
398      state = data.getState();
399    }
400    if (data.hasTransitionCode()) {
401      transitionCode = data.getTransitionCode();
402      seqId = data.getSeqId();
403    }
404  }
405
406  @Override
407  protected void afterReplay(MasterProcedureEnv env) {
408    getParent(env).attachRemoteProc(this);
409  }
410
411  @Override
412  public String getProcName() {
413    return getClass().getSimpleName() + " " + region.getEncodedName();
414  }
415
416  @Override
417  protected void toStringClassDetails(StringBuilder builder) {
418    builder.append(getProcName());
419    if (targetServer != null) {
420      builder.append(", server=");
421      builder.append(this.targetServer);
422    }
423    if (this.retryCounter != null) {
424      builder.append(", retry=");
425      builder.append(this.retryCounter);
426    }
427  }
428}