001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.ThreadLocalRandom;
025import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
026import org.apache.hadoop.hbase.metrics.Counter;
027import org.apache.hadoop.hbase.metrics.Histogram;
028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
029import org.apache.hadoop.hbase.procedure2.util.StringUtils;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.hbase.util.NonceKey;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
039
040/**
041 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate,
042 * stack-indexes, etc.
043 * <p/>
044 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the
045 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may
046 * be called multiple times in the case of failure or a restart, so code must be idempotent. The
047 * return from an execute call is either: null to indicate we are done; ourself if there is more to
048 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes
049 * our execution.
050 * <p/>
051 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps
052 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the
053 * ProcedureState enum from protos:
054 * <ul>
055 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may
056 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to
057 * ROLLEDBACK state.</li>
058 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li>
059 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only
060 * condition when scheduler/ executor will drop procedure from further processing is when procedure
061 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li>
062 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states
063 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li>
064 * </ul>
065 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their
066 * own state. This can lead to confusion. Try to keep the two distinct.
067 * <p/>
068 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback
069 * step is supposed to cleanup the resources created during the execute() step. In case of failure
070 * and restart, rollback() may be called multiple times, so again the code must be idempotent.
071 * <p/>
072 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an
073 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be
074 * locked for the life of a procedure -- not just the calls to execute -- then implementations
075 * should say so with the {@link #holdLock(Object)} method.
076 * <p/>
077 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the
078 * implementation is a bit tricky so we add some comments hrre about it.
079 * <ul>
080 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record
081 * whether we have the lock. We will set it to {@code true} in
082 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in
083 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any
084 * more.</li>
085 * <li>Also added a locked field in the proto message. When storing, the field will be set according
086 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure
087 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto
088 * message is {@code true}.</li>
089 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling
090 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures
091 * need to wait until master is initialized. So the solution here is that, we introduced a new
092 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized
093 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method
094 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the
095 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later
096 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the
097 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is
098 * {@code true}, when we just set the {@link #locked} field to true and return, without actually
099 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li>
100 * </ul>
101 * <p/>
102 * Procedures can be suspended or put in wait state with a callback that gets executed on
103 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and
104 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure
105 * class for an example usage.
106 * </p>
107 * <p/>
108 * There are hooks for collecting metrics on submit of the procedure and on finish. See
109 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}.
110 */
111@InterfaceAudience.Private
112public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> {
113  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
114  public static final long NO_PROC_ID = -1;
115  protected static final int NO_TIMEOUT = -1;
116
117  public enum LockState {
118    LOCK_ACQUIRED, // Lock acquired and ready to execute
119    LOCK_YIELD_WAIT, // Lock not acquired, framework needs to yield
120    LOCK_EVENT_WAIT, // Lock not acquired, an event will yield the procedure
121  }
122
123  // Unchanged after initialization
124  private NonceKey nonceKey = null;
125  private String owner = null;
126  private long parentProcId = NO_PROC_ID;
127  private long rootProcId = NO_PROC_ID;
128  private long procId = NO_PROC_ID;
129  private long submittedTime;
130
131  // Runtime state, updated every operation
132  private ProcedureState state = ProcedureState.INITIALIZING;
133  private RemoteProcedureException exception = null;
134  private int[] stackIndexes = null;
135  private int childrenLatch = 0;
136  // since we do not always maintain stackIndexes if the root procedure does not support rollback,
137  // we need a separated flag to indicate whether a procedure was executed
138  private boolean wasExecuted;
139
140  private volatile int timeout = NO_TIMEOUT;
141  private volatile long lastUpdate;
142
143  private volatile byte[] result = null;
144
145  private volatile boolean locked = false;
146
147  private boolean lockedWhenLoading = false;
148
149  /**
150   * Used for override complete of the procedure without actually doing any logic in the procedure.
151   * If bypass is set to true, when executing it will return null when {@link #doExecute(Object)} is
152   * called to finish the procedure and release any locks it may currently hold. The bypass does
153   * cleanup around the Procedure as far as the Procedure framework is concerned. It does not clean
154   * any internal state that the Procedure's themselves may have set. That is for the Procedures to
155   * do themselves when bypass is called. They should override bypass and do their cleanup in the
156   * overridden bypass method (be sure to call the parent bypass to ensure proper processing).
157   * <p>
158   * </p>
159   * Bypassing a procedure is not like aborting. Aborting a procedure will trigger a rollback. And
160   * since the {@link #abort(Object)} method is overrideable Some procedures may have chosen to
161   * ignore the aborting.
162   */
163  private volatile boolean bypass = false;
164
165  /**
166   * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to
167   * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the
168   * persistence of the procedure.
169   * <p/>
170   * This is useful when the procedure is in error and you want to retry later. The retry interval
171   * and the number of retries are usually not critical so skip the persistence can save some
172   * resources, and also speed up the restart processing.
173   * <p/>
174   * Notice that this value will be reset to true every time before execution. And when rolling back
175   * we do not test this value.
176   */
177  private boolean persist = true;
178
179  public boolean isBypass() {
180    return bypass;
181  }
182
183  /**
184   * Set the bypass to true. Only called in
185   * {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now. DO NOT use
186   * this method alone, since we can't just bypass one single procedure. We need to bypass its
187   * ancestor too. If your Procedure has set state, it needs to undo it in here.
188   * @param env Current environment. May be null because of context; e.g. pretty-printing procedure
189   *            WALs where there is no 'environment' (and where Procedures that require an
190   *            'environment' won't be run.
191   */
192  protected void bypass(TEnvironment env) {
193    this.bypass = true;
194  }
195
196  boolean needPersistence() {
197    return persist;
198  }
199
200  void resetPersistence() {
201    persist = true;
202  }
203
204  protected final void skipPersistence() {
205    persist = false;
206  }
207
208  /**
209   * The main code of the procedure. It must be idempotent since execute() may be called multiple
210   * times in case of machine failure in the middle of the execution.
211   * @param env the environment passed to the ProcedureExecutor
212   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
213   *         procedure is done.
214   * @throws ProcedureYieldException     the procedure will be added back to the queue and retried
215   *                                     later.
216   * @throws InterruptedException        the procedure will be added back to the queue and retried
217   *                                     later.
218   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
219   *                                     and has set itself up waiting for an external event to wake
220   *                                     it back up again.
221   */
222  protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
223    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
224
225  /**
226   * The code to undo what was done by the execute() code. It is called when the procedure or one of
227   * the sub-procedures failed or an abort was requested. It should cleanup all the resources
228   * created by the execute() call. The implementation must be idempotent since rollback() may be
229   * called multiple time in case of machine failure in the middle of the execution.
230   * @param env the environment passed to the ProcedureExecutor
231   * @throws IOException          temporary failure, the rollback will retry later
232   * @throws InterruptedException the procedure will be added back to the queue and retried later
233   */
234  protected abstract void rollback(TEnvironment env) throws IOException, InterruptedException;
235
236  /**
237   * The abort() call is asynchronous and each procedure must decide how to deal with it, if they
238   * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
239   * abort() method and then the execute() will check if the abort flag is set or not. abort() may
240   * be called multiple times from the client, so the implementation must be idempotent.
241   * <p>
242   * NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
243   * procedure implementor abort.
244   */
245  protected abstract boolean abort(TEnvironment env);
246
247  /**
248   * The user-level code of the procedure may have some state to persist (e.g. input arguments or
249   * current position in the processing state) to be able to resume on failure.
250   * @param serializer stores the serializable state
251   */
252  protected abstract void serializeStateData(ProcedureStateSerializer serializer)
253    throws IOException;
254
255  /**
256   * Called on store load to allow the user to decode the previously serialized state.
257   * @param serializer contains the serialized state
258   */
259  protected abstract void deserializeStateData(ProcedureStateSerializer serializer)
260    throws IOException;
261
262  /**
263   * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will
264   * call us to determine whether we need to wait for initialization, second, it will call
265   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
266   * <p/>
267   * This is because that when master restarts, we need to restore the lock state for all the
268   * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the
269   * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part
270   * of the initialization!), so we need to split the code into two steps, and when restore, we just
271   * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock.
272   * @return true means we need to wait until the environment has been initialized, otherwise true.
273   */
274  protected boolean waitInitialized(TEnvironment env) {
275    return false;
276  }
277
278  /**
279   * The user should override this method if they need a lock on an Entity. A lock can be anything,
280   * and it is up to the implementor. The Procedure Framework will call this method just before it
281   * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to
282   * execute.
283   * <p/>
284   * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other
285   * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}.
286   * <p/>
287   * Example: in our Master we can execute request in parallel for different tables. We can create
288   * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is
289   * queued waiting that specific table create to happen.
290   * <p/>
291   * There are 3 LockState:
292   * <ul>
293   * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to
294   * execute.</li>
295   * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should
296   * take care of readding the procedure back to the runnable set for retry</li>
297   * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take
298   * care of readding the procedure back to the runnable set when the lock is available.</li>
299   * </ul>
300   * @return the lock state as described above.
301   */
302  protected LockState acquireLock(TEnvironment env) {
303    return LockState.LOCK_ACQUIRED;
304  }
305
306  /**
307   * The user should override this method, and release lock if necessary.
308   */
309  protected void releaseLock(TEnvironment env) {
310    // no-op
311  }
312
313  /**
314   * Used to keep the procedure lock even when the procedure is yielding or suspended.
315   * @return true if the procedure should hold on the lock until completionCleanup()
316   */
317  protected boolean holdLock(TEnvironment env) {
318    return false;
319  }
320
321  /**
322   * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)}
323   * returns true, the procedure executor will call acquireLock() once and thereafter not call
324   * {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls release/acquire
325   * around each invocation of {@link #execute(Object)}.
326   * @see #holdLock(Object)
327   * @return true if the procedure has the lock, false otherwise.
328   */
329  public final boolean hasLock() {
330    return locked;
331  }
332
333  /**
334   * Called when the procedure is loaded for replay. The procedure implementor may use this method
335   * to perform some quick operation before replay. e.g. failing the procedure if the state on
336   * replay may be unknown.
337   */
338  protected void beforeReplay(TEnvironment env) {
339    // no-op
340  }
341
342  /**
343   * Called when the procedure is ready to be added to the queue after the loading/replay operation.
344   */
345  protected void afterReplay(TEnvironment env) {
346    // no-op
347  }
348
349  /**
350   * Called before we call the execute method of this procedure, but after we acquire the execution
351   * lock and procedure scheduler lock.
352   */
353  protected void beforeExec(TEnvironment env) throws ProcedureSuspendedException {
354    // no-op
355  }
356
357  /**
358   * Called after we call the execute method of this procedure, and also after we initialize all the
359   * sub procedures and persist the the state if persistence is needed.
360   * <p>
361   * This is for doing some hooks after we initialize the sub procedures. See HBASE-29259 for more
362   * details on why we can not release the region lock inside the execute method.
363   */
364  protected void afterExec(TEnvironment env) {
365    // no-op
366  }
367
368  /**
369   * Called when the procedure is marked as completed (success or rollback). The procedure
370   * implementor may use this method to cleanup in-memory states. This operation will not be retried
371   * on failure. If a procedure took a lock, it will have been released when this method runs.
372   */
373  protected void completionCleanup(TEnvironment env) {
374    // no-op
375  }
376
377  /**
378   * By default, the procedure framework/executor will try to run procedures start to finish. Return
379   * true to make the executor yield between each execution step to give other procedures a chance
380   * to run.
381   * @param env the environment passed to the ProcedureExecutor
382   * @return Return true if the executor should yield on completion of an execution step. Defaults
383   *         to return false.
384   */
385  protected boolean isYieldAfterExecutionStep(TEnvironment env) {
386    return false;
387  }
388
389  /**
390   * By default, the executor will keep the procedure result around util the eviction TTL is
391   * expired. The client can cut down the waiting time by requesting that the result is removed from
392   * the executor. In case of system started procedure, we can force the executor to auto-ack.
393   * @param env the environment passed to the ProcedureExecutor
394   * @return true if the executor should wait the client ack for the result. Defaults to return
395   *         true.
396   */
397  protected boolean shouldWaitClientAck(TEnvironment env) {
398    return true;
399  }
400
401  /**
402   * Override this method to provide procedure specific counters for submitted count, failed count
403   * and time histogram.
404   * @param env The environment passed to the procedure executor
405   * @return Container object for procedure related metric
406   */
407  protected ProcedureMetrics getProcedureMetrics(TEnvironment env) {
408    return null;
409  }
410
411  /**
412   * This function will be called just when procedure is submitted for execution. Override this
413   * method to update the metrics at the beginning of the procedure. The default implementation
414   * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null
415   * {@link ProcedureMetrics}.
416   */
417  protected void updateMetricsOnSubmit(TEnvironment env) {
418    ProcedureMetrics metrics = getProcedureMetrics(env);
419    if (metrics == null) {
420      return;
421    }
422
423    Counter submittedCounter = metrics.getSubmittedCounter();
424    if (submittedCounter != null) {
425      submittedCounter.increment();
426    }
427  }
428
429  /**
430   * This function will be called just after procedure execution is finished. Override this method
431   * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns
432   * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a
433   * time histogram for successfully completed procedures. Increments failed counter for failed
434   * procedures.
435   * <p/>
436   * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including
437   * successfully finished siblings, this function may get called twice in certain cases for certain
438   * procedures. Explore further if this can be called once.
439   * @param env     The environment passed to the procedure executor
440   * @param runtime Runtime of the procedure in milliseconds
441   * @param success true if procedure is completed successfully
442   */
443  protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) {
444    ProcedureMetrics metrics = getProcedureMetrics(env);
445    if (metrics == null) {
446      return;
447    }
448
449    if (success) {
450      Histogram timeHisto = metrics.getTimeHisto();
451      if (timeHisto != null) {
452        timeHisto.update(runtime);
453      }
454    } else {
455      Counter failedCounter = metrics.getFailedCounter();
456      if (failedCounter != null) {
457        failedCounter.increment();
458      }
459    }
460  }
461
462  @Override
463  public String toString() {
464    // Return the simple String presentation of the procedure.
465    return toStringSimpleSB().toString();
466  }
467
468  /**
469   * Build the StringBuilder for the simple form of procedure string.
470   * @return the StringBuilder
471   */
472  protected StringBuilder toStringSimpleSB() {
473    final StringBuilder sb = new StringBuilder();
474
475    sb.append("pid=");
476    sb.append(getProcId());
477
478    if (hasParent()) {
479      sb.append(", ppid=");
480      sb.append(getParentProcId());
481    }
482
483    /*
484     * TODO Enable later when this is being used. Currently owner not used. if (hasOwner()) {
485     * sb.append(", owner="); sb.append(getOwner()); }
486     */
487
488    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
489    toStringState(sb);
490
491    sb.append(", hasLock=").append(locked);
492
493    if (bypass) {
494      sb.append(", bypass=").append(bypass);
495    }
496
497    if (hasException()) {
498      sb.append(", exception=" + getException());
499    }
500
501    sb.append("; ");
502    toStringClassDetails(sb);
503
504    return sb;
505  }
506
507  /**
508   * Extend the toString() information with more procedure details
509   */
510  public String toStringDetails() {
511    final StringBuilder sb = toStringSimpleSB();
512
513    sb.append(" submittedTime=");
514    sb.append(getSubmittedTime());
515
516    sb.append(", lastUpdate=");
517    sb.append(getLastUpdate());
518
519    final int[] stackIndices = getStackIndexes();
520    if (stackIndices != null) {
521      sb.append("\n");
522      sb.append("stackIndexes=");
523      sb.append(Arrays.toString(stackIndices));
524    }
525
526    return sb.toString();
527  }
528
529  protected String toStringClass() {
530    StringBuilder sb = new StringBuilder();
531    toStringClassDetails(sb);
532    return sb.toString();
533  }
534
535  /**
536   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
537   * generic Procedure State with Procedure particulars.
538   * @param builder Append current {@link ProcedureState}
539   */
540  protected void toStringState(StringBuilder builder) {
541    builder.append(getState());
542  }
543
544  /**
545   * Extend the toString() information with the procedure details e.g. className and parameters
546   * @param builder the string builder to use to append the proc specific information
547   */
548  protected void toStringClassDetails(StringBuilder builder) {
549    builder.append(getClass().getName());
550  }
551
552  // ==========================================================================
553  // Those fields are unchanged after initialization.
554  //
555  // Each procedure will get created from the user or during
556  // ProcedureExecutor.start() during the load() phase and then submitted
557  // to the executor. these fields will never be changed after initialization
558  // ==========================================================================
559  public long getProcId() {
560    return procId;
561  }
562
563  public boolean hasParent() {
564    return parentProcId != NO_PROC_ID;
565  }
566
567  public long getParentProcId() {
568    return parentProcId;
569  }
570
571  public long getRootProcId() {
572    return rootProcId;
573  }
574
575  public String getProcName() {
576    return toStringClass();
577  }
578
579  public NonceKey getNonceKey() {
580    return nonceKey;
581  }
582
583  public long getSubmittedTime() {
584    return submittedTime;
585  }
586
587  public String getOwner() {
588    return owner;
589  }
590
591  public boolean hasOwner() {
592    return owner != null;
593  }
594
595  /**
596   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
597   */
598  protected void setProcId(long procId) {
599    this.procId = procId;
600    this.submittedTime = EnvironmentEdgeManager.currentTime();
601    setState(ProcedureState.RUNNABLE);
602  }
603
604  /**
605   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
606   */
607  protected void setParentProcId(long parentProcId) {
608    this.parentProcId = parentProcId;
609  }
610
611  protected void setRootProcId(long rootProcId) {
612    this.rootProcId = rootProcId;
613  }
614
615  /**
616   * Called by the ProcedureExecutor to set the value to the newly created procedure.
617   */
618  protected void setNonceKey(NonceKey nonceKey) {
619    this.nonceKey = nonceKey;
620  }
621
622  public void setOwner(String owner) {
623    this.owner = StringUtils.isEmpty(owner) ? null : owner;
624  }
625
626  public void setOwner(User owner) {
627    assert owner != null : "expected owner to be not null";
628    setOwner(owner.getShortName());
629  }
630
631  /**
632   * Called on store load to initialize the Procedure internals after the creation/deserialization.
633   */
634  protected void setSubmittedTime(long submittedTime) {
635    this.submittedTime = submittedTime;
636  }
637
638  // ==========================================================================
639  // runtime state - timeout related
640  // ==========================================================================
641  /**
642   * @param timeout timeout interval in msec
643   */
644  protected void setTimeout(int timeout) {
645    this.timeout = timeout;
646  }
647
648  public boolean hasTimeout() {
649    return timeout != NO_TIMEOUT;
650  }
651
652  /** Returns the timeout in msec */
653  public int getTimeout() {
654    return timeout;
655  }
656
657  /**
658   * Called on store load to initialize the Procedure internals after the creation/deserialization.
659   */
660  protected void setLastUpdate(long lastUpdate) {
661    this.lastUpdate = lastUpdate;
662  }
663
664  /**
665   * Called by ProcedureExecutor after each time a procedure step is executed.
666   */
667  protected void updateTimestamp() {
668    this.lastUpdate = EnvironmentEdgeManager.currentTime();
669  }
670
671  public long getLastUpdate() {
672    return lastUpdate;
673  }
674
675  /**
676   * Timeout of the next timeout. Called by the ProcedureExecutor if the procedure has timeout set
677   * and the procedure is in the waiting queue.
678   * @return the timestamp of the next timeout.
679   */
680  protected long getTimeoutTimestamp() {
681    return getLastUpdate() + getTimeout();
682  }
683
684  // ==========================================================================
685  // runtime state
686  // ==========================================================================
687  /** Returns the time elapsed between the last update and the start time of the procedure. */
688  public long elapsedTime() {
689    return getLastUpdate() - getSubmittedTime();
690  }
691
692  /** Returns the serialized result if any, otherwise null */
693  public byte[] getResult() {
694    return result;
695  }
696
697  /**
698   * The procedure may leave a "result" on completion.
699   * @param result the serialized result that will be passed to the client
700   */
701  protected void setResult(byte[] result) {
702    this.result = result;
703  }
704
705  /**
706   * Will only be called when loading procedures from procedure store, where we need to record
707   * whether the procedure has already held a lock. Later we will call {@link #restoreLock(Object)}
708   * to actually acquire the lock.
709   */
710  final void lockedWhenLoading() {
711    this.lockedWhenLoading = true;
712  }
713
714  /**
715   * Can only be called when restarting, before the procedure actually being executed, as after we
716   * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
717   * {@link #lockedWhenLoading} to false.
718   * <p/>
719   * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
720   * front of a queue.
721   */
722  public boolean isLockedWhenLoading() {
723    return lockedWhenLoading;
724  }
725
726  // ==============================================================================================
727  // Runtime state, updated every operation by the ProcedureExecutor
728  //
729  // There is always 1 thread at the time operating on the state of the procedure.
730  // The ProcedureExecutor may check and set states, or some Procecedure may
731  // update its own state. but no concurrent updates. we use synchronized here
732  // just because the procedure can get scheduled on different executor threads on each step.
733  // ==============================================================================================
734
735  /** Returns true if the procedure is in a RUNNABLE state. */
736  public synchronized boolean isRunnable() {
737    return state == ProcedureState.RUNNABLE;
738  }
739
740  public synchronized boolean isInitializing() {
741    return state == ProcedureState.INITIALIZING;
742  }
743
744  /** Returns true if the procedure has failed. It may or may not have rolled back. */
745  public synchronized boolean isFailed() {
746    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
747  }
748
749  /** Returns true if the procedure is finished successfully. */
750  public synchronized boolean isSuccess() {
751    return state == ProcedureState.SUCCESS && !hasException();
752  }
753
754  /**
755   * @return true if the procedure is finished. The Procedure may be completed successfully or
756   *         rolledback.
757   */
758  public synchronized boolean isFinished() {
759    return isSuccess() || state == ProcedureState.ROLLEDBACK;
760  }
761
762  /** Returns true if the procedure is waiting for a child to finish or for an external event. */
763  public synchronized boolean isWaiting() {
764    switch (state) {
765      case WAITING:
766      case WAITING_TIMEOUT:
767        return true;
768      default:
769        break;
770    }
771    return false;
772  }
773
774  protected synchronized void setState(final ProcedureState state) {
775    this.state = state;
776    updateTimestamp();
777  }
778
779  public synchronized ProcedureState getState() {
780    return state;
781  }
782
783  protected void setFailure(final String source, final Throwable cause) {
784    setFailure(new RemoteProcedureException(source, cause));
785  }
786
787  protected synchronized void setFailure(final RemoteProcedureException exception) {
788    this.exception = exception;
789    if (!isFinished()) {
790      setState(ProcedureState.FAILED);
791    }
792  }
793
794  protected void setAbortFailure(final String source, final String msg) {
795    setFailure(source, new ProcedureAbortedException(msg));
796  }
797
798  /**
799   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
800   * <p/>
801   * Another usage for this method is to implement retrying. A procedure can set the state to
802   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
803   * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
804   * call {@link #setTimeout(int)} method to set the timeout. And you should also override this
805   * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
806   * timeout event has been handled.
807   * @return true to let the framework handle the timeout as abort, false in case the procedure
808   *         handled the timeout itself.
809   */
810  protected synchronized boolean setTimeoutFailure(TEnvironment env) {
811    if (state == ProcedureState.WAITING_TIMEOUT) {
812      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
813      setFailure("ProcedureExecutor",
814        new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
815      return true;
816    }
817    return false;
818  }
819
820  public synchronized boolean hasException() {
821    return exception != null;
822  }
823
824  public synchronized RemoteProcedureException getException() {
825    return exception;
826  }
827
828  /**
829   * Called by the ProcedureExecutor on procedure-load to restore the latch state
830   */
831  protected synchronized void setChildrenLatch(int numChildren) {
832    this.childrenLatch = numChildren;
833    if (LOG.isTraceEnabled()) {
834      LOG.trace("CHILD LATCH INCREMENT SET " + this.childrenLatch, new Throwable(this.toString()));
835    }
836  }
837
838  /**
839   * Called by the ProcedureExecutor on procedure-load to restore the latch state
840   */
841  protected synchronized void incChildrenLatch() {
842    // TODO: can this be inferred from the stack? I think so...
843    this.childrenLatch++;
844    if (LOG.isTraceEnabled()) {
845      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
846    }
847  }
848
849  /**
850   * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed.
851   */
852  private synchronized boolean childrenCountDown() {
853    assert childrenLatch > 0 : this;
854    boolean b = --childrenLatch == 0;
855    if (LOG.isTraceEnabled()) {
856      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
857    }
858    return b;
859  }
860
861  /**
862   * Try to set this procedure into RUNNABLE state. Succeeds if all subprocedures/children are done.
863   * @return True if we were able to move procedure to RUNNABLE state.
864   */
865  synchronized boolean tryRunnable() {
866    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
867    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
868      setState(ProcedureState.RUNNABLE);
869      return true;
870    } else {
871      return false;
872    }
873  }
874
875  protected synchronized boolean hasChildren() {
876    return childrenLatch > 0;
877  }
878
879  protected synchronized int getChildrenLatch() {
880    return childrenLatch;
881  }
882
883  /**
884   * Called by the RootProcedureState on procedure execution. Each procedure store its stack-index
885   * positions.
886   */
887  protected synchronized void addStackIndex(final int index) {
888    if (stackIndexes == null) {
889      stackIndexes = new int[] { index };
890    } else {
891      int count = stackIndexes.length;
892      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
893      stackIndexes[count] = index;
894    }
895    wasExecuted = true;
896  }
897
898  protected synchronized boolean removeStackIndex() {
899    if (stackIndexes != null && stackIndexes.length > 1) {
900      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
901      return false;
902    } else {
903      stackIndexes = null;
904      return true;
905    }
906  }
907
908  /**
909   * Called on store load to initialize the Procedure internals after the creation/deserialization.
910   */
911  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
912    this.stackIndexes = new int[stackIndexes.size()];
913    for (int i = 0; i < this.stackIndexes.length; ++i) {
914      this.stackIndexes[i] = stackIndexes.get(i);
915    }
916    // for backward compatible, where a procedure is serialized before we added the executed flag,
917    // the flag will be false so we need to set the wasExecuted flag here
918    this.wasExecuted = true;
919  }
920
921  protected synchronized void setExecuted() {
922    this.wasExecuted = true;
923  }
924
925  public synchronized boolean wasExecuted() {
926    return wasExecuted;
927  }
928
929  protected synchronized int[] getStackIndexes() {
930    return stackIndexes;
931  }
932
933  /**
934   * Return whether the procedure supports rollback. If the procedure does not support rollback, we
935   * can skip the rollback state management which could increase the performance. See HBASE-28210
936   * and HBASE-28212.
937   */
938  protected boolean isRollbackSupported() {
939    return true;
940  }
941
942  // ==========================================================================
943  // Internal methods - called by the ProcedureExecutor
944  // ==========================================================================
945
946  /**
947   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
948   * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and
949   *                                     skip out without changing states or releasing any locks
950   *                                     held.
951   */
952  protected Procedure<TEnvironment>[] doExecute(TEnvironment env)
953    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
954    try {
955      updateTimestamp();
956      if (bypass) {
957        LOG.info("{} bypassed, returning null to finish it", this);
958        return null;
959      }
960      return execute(env);
961    } finally {
962      updateTimestamp();
963    }
964  }
965
966  /**
967   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
968   */
969  protected void doRollback(TEnvironment env) throws IOException, InterruptedException {
970    try {
971      updateTimestamp();
972      if (bypass) {
973        LOG.info("{} bypassed, skipping rollback", this);
974        return;
975      }
976      rollback(env);
977    } finally {
978      updateTimestamp();
979    }
980  }
981
982  final void restoreLock(TEnvironment env) {
983    if (!lockedWhenLoading) {
984      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this);
985      return;
986    }
987
988    if (isFinished()) {
989      LOG.debug("{} is already finished, skip acquiring lock.", this);
990      return;
991    }
992
993    if (isBypass()) {
994      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
995      return;
996    }
997    // this can happen if the parent stores the sub procedures but before it can
998    // release its lock, the master restarts
999    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
1000      LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
1001      lockedWhenLoading = false;
1002      return;
1003    }
1004    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
1005    LockState state = acquireLock(env);
1006    assert state == LockState.LOCK_ACQUIRED;
1007  }
1008
1009  /**
1010   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
1011   */
1012  final LockState doAcquireLock(TEnvironment env, ProcedureStore store) {
1013    if (waitInitialized(env)) {
1014      return LockState.LOCK_EVENT_WAIT;
1015    }
1016    if (lockedWhenLoading) {
1017      // reset it so we will not consider it anymore
1018      lockedWhenLoading = false;
1019      locked = true;
1020      // Here we return without persist the locked state, as lockedWhenLoading is true means
1021      // that the locked field of the procedure stored in procedure store is true, so we do not need
1022      // to store it again.
1023      return LockState.LOCK_ACQUIRED;
1024    }
1025    LockState state = acquireLock(env);
1026    if (state == LockState.LOCK_ACQUIRED) {
1027      locked = true;
1028      // persist that we have held the lock. This must be done before we actually execute the
1029      // procedure, otherwise when restarting, we may consider the procedure does not have a lock,
1030      // but it may have already done some changes as we have already executed it, and if another
1031      // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do
1032      // not expect that another procedure can be executed in the middle.
1033      store.update(this);
1034    }
1035    return state;
1036  }
1037
1038  /**
1039   * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
1040   */
1041  final void doReleaseLock(TEnvironment env, ProcedureStore store) {
1042    locked = false;
1043    // persist that we have released the lock. This must be done before we actually release the
1044    // lock. Another procedure may take this lock immediately after we release the lock, and if we
1045    // crash before persist the information that we have already released the lock, then when
1046    // restarting there will be two procedures which both have the lock and cause problems.
1047    if (getState() != ProcedureState.ROLLEDBACK) {
1048      // If the state is ROLLEDBACK, it means that we have already deleted the procedure from
1049      // procedure store, so do not need to log the release operation any more.
1050      store.update(this);
1051    }
1052    releaseLock(env);
1053  }
1054
1055  protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
1056    throws ProcedureSuspendedException {
1057    if (jitter) {
1058      // 10% possible jitter
1059      double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
1060      timeoutMillis += add;
1061    }
1062    setTimeout(timeoutMillis);
1063    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
1064    skipPersistence();
1065    throw new ProcedureSuspendedException();
1066  }
1067
1068  @Override
1069  public int compareTo(final Procedure<TEnvironment> other) {
1070    return Long.compare(getProcId(), other.getProcId());
1071  }
1072
1073  // ==========================================================================
1074  // misc utils
1075  // ==========================================================================
1076
1077  /**
1078   * Get an hashcode for the specified Procedure ID
1079   * @return the hashcode for the specified procId
1080   */
1081  public static long getProcIdHashCode(long procId) {
1082    long h = procId;
1083    h ^= h >> 16;
1084    h *= 0x85ebca6b;
1085    h ^= h >> 13;
1086    h *= 0xc2b2ae35;
1087    h ^= h >> 16;
1088    return h;
1089  }
1090
1091  /**
1092   * Helper to lookup the root Procedure ID given a specified procedure.
1093   */
1094  protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures,
1095    Procedure<T> proc) {
1096    while (proc.hasParent()) {
1097      proc = procedures.get(proc.getParentProcId());
1098      if (proc == null) {
1099        return null;
1100      }
1101    }
1102    return proc.getProcId();
1103  }
1104
1105  /**
1106   * @param a the first procedure to be compared.
1107   * @param b the second procedure to be compared.
1108   * @return true if the two procedures have the same parent
1109   */
1110  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
1111    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
1112  }
1113}