001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.IOException;
022import java.io.UncheckedIOException;
023import java.util.ArrayDeque;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.Deque;
029import java.util.HashSet;
030import java.util.List;
031import java.util.PriorityQueue;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Executors;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadFactory;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.concurrent.atomic.AtomicLong;
044import java.util.stream.Collectors;
045import java.util.stream.Stream;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
049import org.apache.hadoop.hbase.log.HBaseMarkers;
050import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
051import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
052import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
053import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
054import org.apache.hadoop.hbase.procedure2.trace.ProcedureSpanBuilder;
055import org.apache.hadoop.hbase.procedure2.util.StringUtils;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.trace.TraceUtil;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.IdLock;
060import org.apache.hadoop.hbase.util.NonceKey;
061import org.apache.hadoop.hbase.util.Threads;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
067import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
068
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
070
071/**
072 * Thread Pool that executes the submitted procedures. The executor has a ProcedureStore associated.
073 * Each operation is logged and on restart the pending procedures are resumed. Unless the Procedure
074 * code throws an error (e.g. invalid user input) the procedure will complete (at some point in
075 * time), On restart the pending procedures are resumed and the once failed will be rolledback. The
076 * user can add procedures to the executor via submitProcedure(proc) check for the finished state
077 * via isFinished(procId) and get the result via getResult(procId)
078 */
079@InterfaceAudience.Private
080public class ProcedureExecutor<TEnvironment> {
081  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
082
083  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
084  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
085
086  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
087    "hbase.procedure.worker.keep.alive.time.msec";
088  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
089
090  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
091  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
092
093  public static final String EVICT_ACKED_TTL_CONF_KEY = "hbase.procedure.cleaner.acked.evict.ttl";
094  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
095
096  /**
097   * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to break PE
098   * having it fail at various junctures. When non-null, testing is set to an instance of the below
099   * internal {@link Testing} class with flags set for the particular test.
100   */
101  volatile Testing testing = null;
102
103  /**
104   * Class with parameters describing how to fail/die when in testing-context.
105   */
106  public static class Testing {
107    protected volatile boolean killIfHasParent = true;
108    protected volatile boolean killIfSuspended = false;
109
110    /**
111     * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
112     * persisting all the state it needs to recover after a crash.
113     */
114    protected volatile boolean killBeforeStoreUpdate = false;
115    protected volatile boolean toggleKillBeforeStoreUpdate = false;
116
117    /**
118     * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
119     * is about a case where memory-state was being set after store to WAL where a crash could cause
120     * us to get stuck. This flag allows killing at what was a vulnerable time.
121     */
122    protected volatile boolean killAfterStoreUpdate = false;
123    protected volatile boolean toggleKillAfterStoreUpdate = false;
124
125    protected volatile boolean killBeforeStoreUpdateInRollback = false;
126    protected volatile boolean toggleKillBeforeStoreUpdateInRollback = false;
127
128    protected boolean shouldKillBeforeStoreUpdate() {
129      final boolean kill = this.killBeforeStoreUpdate;
130      if (this.toggleKillBeforeStoreUpdate) {
131        this.killBeforeStoreUpdate = !kill;
132        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
133      }
134      return kill;
135    }
136
137    protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
138      if (isSuspended && !killIfSuspended) {
139        return false;
140      }
141      if (hasParent && !killIfHasParent) {
142        return false;
143      }
144      return shouldKillBeforeStoreUpdate();
145    }
146
147    protected boolean shouldKillAfterStoreUpdate() {
148      final boolean kill = this.killAfterStoreUpdate;
149      if (this.toggleKillAfterStoreUpdate) {
150        this.killAfterStoreUpdate = !kill;
151        LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
152      }
153      return kill;
154    }
155
156    protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
157      return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
158    }
159
160    protected boolean shouldKillBeforeStoreUpdateInRollback() {
161      final boolean kill = this.killBeforeStoreUpdateInRollback;
162      if (this.toggleKillBeforeStoreUpdateInRollback) {
163        this.killBeforeStoreUpdateInRollback = !kill;
164        LOG.warn("Toggle KILL before store update in rollback to: "
165          + this.killBeforeStoreUpdateInRollback);
166      }
167      return kill;
168    }
169  }
170
171  public interface ProcedureExecutorListener {
172    void procedureLoaded(long procId);
173
174    void procedureAdded(long procId);
175
176    void procedureFinished(long procId);
177  }
178
179  /**
180   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. Once a
181   * Root-Procedure completes (success or failure), the result will be added to this map. The user
182   * of ProcedureExecutor should call getResult(procId) to get the result.
183   */
184  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
185    new ConcurrentHashMap<>();
186
187  /**
188   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
189   * The RootProcedureState contains the execution stack of the Root-Procedure, It is added to the
190   * map by submitProcedure() and removed on procedure completion.
191   */
192  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
193    new ConcurrentHashMap<>();
194
195  /**
196   * Helper map to lookup the live procedures by ID. This map contains every procedure.
197   * root-procedures and subprocedures.
198   */
199  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
200    new ConcurrentHashMap<>();
201
202  /**
203   * Helper map to lookup whether the procedure already issued from the same client. This map
204   * contains every root procedure.
205   */
206  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
207
208  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
209    new CopyOnWriteArrayList<>();
210
211  private Configuration conf;
212
213  /**
214   * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
215   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
216   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
217   */
218  private ThreadGroup threadGroup;
219
220  /**
221   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
222   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
223   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
224   */
225  private CopyOnWriteArrayList<WorkerThread> workerThreads;
226
227  /**
228   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
229   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
230   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
231   */
232  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
233
234  /**
235   * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
236   * there is no worker to assign meta, it will new worker thread for it, so it is very important.
237   * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore and
238   * so on, some tasks may execute for a long time so will block other tasks like WorkerMonitor, so
239   * use a dedicated thread for executing WorkerMonitor.
240   */
241  private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;
242
243  private ExecutorService forceUpdateExecutor;
244
245  // A thread pool for executing some asynchronous tasks for procedures, you can find references to
246  // getAsyncTaskExecutor to see the usage
247  private ExecutorService asyncTaskExecutor;
248
249  private int corePoolSize;
250  private int maxPoolSize;
251
252  private volatile long keepAliveTime;
253
254  /**
255   * Scheduler/Queue that contains runnable procedures.
256   */
257  private final ProcedureScheduler scheduler;
258
259  private final AtomicLong lastProcId = new AtomicLong(-1);
260  private final AtomicLong workerId = new AtomicLong(0);
261  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
262  private final AtomicBoolean running = new AtomicBoolean(false);
263  private final TEnvironment environment;
264  private final ProcedureStore store;
265
266  private final boolean checkOwnerSet;
267
268  // To prevent concurrent execution of the same procedure.
269  // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
270  // procedure is woken up before we finish the suspend which causes the same procedures to be
271  // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
272  // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
273  // execution of the same procedure.
274  private final IdLock procExecutionLock = new IdLock();
275
276  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
277    final ProcedureStore store) {
278    this(conf, environment, store, new SimpleProcedureScheduler());
279  }
280
281  private boolean isRootFinished(Procedure<?> proc) {
282    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
283    return rootProc == null || rootProc.isFinished();
284  }
285
286  private void forceUpdateProcedure(long procId) throws IOException {
287    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
288    try {
289      Procedure<TEnvironment> proc = procedures.get(procId);
290      if (proc != null) {
291        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
292          LOG.debug("Procedure {} has already been finished and parent is succeeded,"
293            + " skip force updating", proc);
294          return;
295        }
296      } else {
297        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
298        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
299          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
300          return;
301        }
302        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
303        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
304        if (retainer.isExpired(EnvironmentEdgeManager.currentTime(), evictTtl, evictAckTtl)) {
305          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
306            procId);
307          return;
308        }
309        proc = retainer.getProcedure();
310      }
311      LOG.debug("Force update procedure {}", proc);
312      store.update(proc);
313    } finally {
314      procExecutionLock.releaseLockEntry(lockEntry);
315    }
316  }
317
318  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
319    final ProcedureStore store, final ProcedureScheduler scheduler) {
320    this.environment = environment;
321    this.scheduler = scheduler;
322    this.store = store;
323    this.conf = conf;
324    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
325    refreshConfiguration(conf);
326  }
327
328  private void load(final boolean abortOnCorruption) throws IOException {
329    Preconditions.checkArgument(completed.isEmpty(), "completed not empty: %s", completed);
330    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty: %s",
331      rollbackStack);
332    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty: %s", procedures);
333    Preconditions.checkArgument(scheduler.size() == 0, "scheduler queue not empty: %s", scheduler);
334
335    store.load(new ProcedureStore.ProcedureLoader() {
336      @Override
337      public void setMaxProcId(long maxProcId) {
338        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
339        lastProcId.set(maxProcId);
340      }
341
342      @Override
343      public void load(ProcedureIterator procIter) throws IOException {
344        loadProcedures(procIter);
345      }
346
347      @Override
348      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
349        int corruptedCount = 0;
350        while (procIter.hasNext()) {
351          Procedure<?> proc = procIter.next();
352          LOG.error("Corrupt " + proc);
353          corruptedCount++;
354        }
355        if (abortOnCorruption && corruptedCount > 0) {
356          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
357        }
358      }
359    });
360  }
361
362  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
363    proc.restoreLock(getEnvironment());
364    restored.add(proc.getProcId());
365  }
366
367  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
368    while (!stack.isEmpty()) {
369      restoreLock(stack.pop(), restored);
370    }
371  }
372
373  // Restore the locks for all the procedures.
374  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
375  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
376  // calling the acquireLock method for the parent procedure.
377  // The algorithm is straight-forward:
378  // 1. Use a set to record the procedures which locks have already been restored.
379  // 2. Use a stack to store the hierarchy of the procedures
380  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
381  // unless
382  // a. We have no parent, i.e, we are the root procedure
383  // b. The lock has already been restored(by checking the set introduced in #1)
384  // then we start to pop the stack and call acquireLock for each procedure.
385  // Notice that this should be done for all procedures, not only the ones in runnableList.
386  private void restoreLocks() {
387    Set<Long> restored = new HashSet<>();
388    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
389    procedures.values().forEach(proc -> {
390      for (;;) {
391        if (restored.contains(proc.getProcId())) {
392          restoreLocks(stack, restored);
393          return;
394        }
395        if (!proc.hasParent()) {
396          restoreLock(proc, restored);
397          restoreLocks(stack, restored);
398          return;
399        }
400        stack.push(proc);
401        proc = procedures.get(proc.getParentProcId());
402      }
403    });
404  }
405
406  private void initializeStacks(ProcedureIterator procIter,
407    List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList,
408    List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> waitingTimeoutList)
409    throws IOException {
410    procIter.reset();
411    while (procIter.hasNext()) {
412      if (procIter.isNextFinished()) {
413        procIter.skipNext();
414        continue;
415      }
416
417      @SuppressWarnings("unchecked")
418      Procedure<TEnvironment> proc = procIter.next();
419      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
420      LOG.debug("Loading {}", proc);
421      Long rootProcId = getRootProcedureId(proc);
422      // The orphan procedures will be passed to handleCorrupted, so add an assert here
423      assert rootProcId != null;
424
425      if (proc.hasParent()) {
426        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
427        if (parent != null && !proc.isFinished()) {
428          parent.incChildrenLatch();
429        }
430      }
431
432      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
433      procStack.loadStack(proc);
434
435      proc.setRootProcId(rootProcId);
436      switch (proc.getState()) {
437        case RUNNABLE:
438          runnableList.add(proc);
439          break;
440        case WAITING:
441          waitingList.add(proc);
442          break;
443        case WAITING_TIMEOUT:
444          waitingTimeoutList.add(proc);
445          break;
446        case FAILED:
447          failedList.add(proc);
448          break;
449        case ROLLEDBACK:
450        case INITIALIZING:
451          String msg = "Unexpected " + proc.getState() + " state for " + proc;
452          LOG.error(msg);
453          throw new UnsupportedOperationException(msg);
454        default:
455          break;
456      }
457    }
458    rollbackStack.forEach((rootProcId, procStack) -> {
459      if (procStack.getSubproceduresStack() != null) {
460        // if we have already record some stack ids, it means we support rollback
461        procStack.setRollbackSupported(true);
462      } else {
463        // otherwise, test the root procedure to see if we support rollback
464        procStack.setRollbackSupported(procedures.get(rootProcId).isRollbackSupported());
465      }
466    });
467  }
468
469  private void processWaitingProcedures(List<Procedure<TEnvironment>> waitingList,
470    List<Procedure<TEnvironment>> runnableList) {
471    waitingList.forEach(proc -> {
472      if (!proc.hasChildren()) {
473        // Normally, WAITING procedures should be waken by its children. But, there is a case that,
474        // all the children are successful and before they can wake up their parent procedure, the
475        // master was killed. So, during recovering the procedures from ProcedureWal, its children
476        // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
477        // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
478        // exception will throw:
479        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
480        // "NOT RUNNABLE! " + procedure.toString());
481        proc.setState(ProcedureState.RUNNABLE);
482        runnableList.add(proc);
483      } else {
484        proc.afterReplay(getEnvironment());
485      }
486    });
487  }
488
489  private void processWaitingTimeoutProcedures(List<Procedure<TEnvironment>> waitingTimeoutList) {
490    waitingTimeoutList.forEach(proc -> {
491      proc.afterReplay(getEnvironment());
492      timeoutExecutor.add(proc);
493    });
494  }
495
496  private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList,
497    List<Procedure<TEnvironment>> failedList) {
498    failedList.forEach(scheduler::addBack);
499    // Put the procedures which have been executed first
500    // For table procedures, to prevent concurrent modifications, we only allow one procedure to run
501    // for a single table at the same time, this is done via inserting a waiting queue before
502    // actually add the procedure to run queue. So when loading here, we should add the procedures
503    // which have been executed first, otherwise another procedure which was in the waiting queue
504    // before restarting may be added to run queue first and still cause concurrent modifications.
505    // See HBASE-28263 for the reason why we need this
506    runnableList.sort((p1, p2) -> {
507      if (p1.wasExecuted()) {
508        if (p2.wasExecuted()) {
509          return Long.compare(p1.getProcId(), p2.getProcId());
510        } else {
511          return -1;
512        }
513      } else {
514        if (p2.wasExecuted()) {
515          return 1;
516        } else {
517          return Long.compare(p1.getProcId(), p2.getProcId());
518        }
519      }
520    });
521    runnableList.forEach(p -> {
522      p.afterReplay(getEnvironment());
523      if (!p.hasParent()) {
524        sendProcedureLoadedNotification(p.getProcId());
525      }
526      scheduler.addBack(p);
527    });
528  }
529
530  private void loadProcedures(ProcedureIterator procIter) throws IOException {
531    // 1. Build the rollback stack
532    int runnableCount = 0;
533    int failedCount = 0;
534    int waitingCount = 0;
535    int waitingTimeoutCount = 0;
536    while (procIter.hasNext()) {
537      boolean finished = procIter.isNextFinished();
538      @SuppressWarnings("unchecked")
539      Procedure<TEnvironment> proc = procIter.next();
540      NonceKey nonceKey = proc.getNonceKey();
541      long procId = proc.getProcId();
542
543      if (finished) {
544        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
545        LOG.debug("Completed {}", proc);
546      } else {
547        if (!proc.hasParent()) {
548          assert !proc.isFinished() : "unexpected finished procedure";
549          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
550        }
551
552        // add the procedure to the map
553        proc.beforeReplay(getEnvironment());
554        procedures.put(proc.getProcId(), proc);
555        switch (proc.getState()) {
556          case RUNNABLE:
557            runnableCount++;
558            break;
559          case FAILED:
560            failedCount++;
561            break;
562          case WAITING:
563            waitingCount++;
564            break;
565          case WAITING_TIMEOUT:
566            waitingTimeoutCount++;
567            break;
568          default:
569            break;
570        }
571      }
572
573      if (nonceKey != null) {
574        nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
575      }
576    }
577
578    // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
579    // push it into the ProcedureScheduler directly to execute the rollback. But this does not work
580    // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
581    // the queue from runQueue in scheduler, and then when a procedure which has lock access, for
582    // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
583    // we will add the queue back to let the workers poll from it. The assumption here is that, the
584    // procedure which has the xlock should have been polled out already, so when loading we can not
585    // add the procedure to scheduler first and then call acquireLock, since the procedure is still
586    // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
587    // then there is a dead lock
588    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
589    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
590    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
591    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
592
593    initializeStacks(procIter, runnableList, failedList, waitingList, waitingTimeoutList);
594
595    // 3. Check the waiting procedures to see if some of them can be added to runnable.
596    processWaitingProcedures(waitingList, runnableList);
597
598    // 4. restore locks
599    restoreLocks();
600
601    // 5. Push the procedures to the timeout executor
602    processWaitingTimeoutProcedures(waitingTimeoutList);
603
604    // 6. Push the procedure to the scheduler
605    pushProceduresAfterLoad(runnableList, failedList);
606    // After all procedures put into the queue, signal the worker threads.
607    // Otherwise, there is a race condition. See HBASE-21364.
608    scheduler.signalAll();
609  }
610
611  /**
612   * Initialize the procedure executor, but do not start workers. We will start them later.
613   * <p/>
614   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
615   * ensure a single executor, and start the procedure replay to resume and recover the previous
616   * pending and in-progress procedures.
617   * @param numThreads        number of threads available for procedure execution.
618   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
619   *                          is found on replay. otherwise false.
620   */
621  public void init(int numThreads, boolean abortOnCorruption) throws IOException {
622    // We have numThreads executor + one timer thread used for timing out
623    // procedures and triggering periodic procedures.
624    this.corePoolSize = numThreads;
625    this.maxPoolSize = 10 * numThreads;
626    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
627      corePoolSize, maxPoolSize);
628
629    this.threadGroup = new ThreadGroup("PEWorkerGroup");
630    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
631    this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
632    ThreadFactory backingThreadFactory = new ThreadFactory() {
633
634      @Override
635      public Thread newThread(Runnable r) {
636        return new Thread(threadGroup, r);
637      }
638    };
639    int size = Math.max(2, Runtime.getRuntime().availableProcessors());
640    ThreadPoolExecutor executor =
641      new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
642        new ThreadFactoryBuilder().setDaemon(true)
643          .setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d")
644          .setThreadFactory(backingThreadFactory).build());
645    executor.allowCoreThreadTimeOut(true);
646    this.asyncTaskExecutor = executor;
647    forceUpdateExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
648      .setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build());
649    store.registerListener(new ProcedureStoreListener() {
650
651      @Override
652      public void forceUpdate(long[] procIds) {
653        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
654          try {
655            forceUpdateProcedure(procId);
656          } catch (IOException e) {
657            LOG.warn("Failed to force update procedure with pid={}", procId);
658          }
659        }));
660      }
661    });
662
663    // Create the workers
664    workerId.set(0);
665    workerThreads = new CopyOnWriteArrayList<>();
666    for (int i = 0; i < corePoolSize; ++i) {
667      workerThreads.add(new WorkerThread(threadGroup));
668    }
669
670    long st, et;
671
672    // Acquire the store lease.
673    st = System.nanoTime();
674    store.recoverLease();
675    et = System.nanoTime();
676    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
677      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
678
679    // start the procedure scheduler
680    scheduler.start();
681
682    // TODO: Split in two steps.
683    // TODO: Handle corrupted procedures (currently just a warn)
684    // The first one will make sure that we have the latest id,
685    // so we can start the threads and accept new procedures.
686    // The second step will do the actual load of old procedures.
687    st = System.nanoTime();
688    load(abortOnCorruption);
689    et = System.nanoTime();
690    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
691      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
692  }
693
694  /**
695   * Start the workers.
696   */
697  public void startWorkers() throws IOException {
698    if (!running.compareAndSet(false, true)) {
699      LOG.warn("Already running");
700      return;
701    }
702    // Start the executors. Here we must have the lastProcId set.
703    LOG.trace("Start workers {}", workerThreads.size());
704    timeoutExecutor.start();
705    workerMonitorExecutor.start();
706    for (WorkerThread worker : workerThreads) {
707      worker.start();
708    }
709
710    // Internal chores
711    workerMonitorExecutor.add(new WorkerMonitor());
712
713    // Add completed cleaner chore
714    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
715      nonceKeysToProcIdsMap));
716  }
717
718  public void stop() {
719    // it is possible that we fail in init, while loading procedures, so we will not set running to
720    // true but we should have already started the ProcedureScheduler, and also the two
721    // ExecutorServices, so here we do not check running state, just stop them
722    running.set(false);
723    LOG.info("Stopping");
724    scheduler.stop();
725    timeoutExecutor.sendStopSignal();
726    workerMonitorExecutor.sendStopSignal();
727    forceUpdateExecutor.shutdown();
728    asyncTaskExecutor.shutdown();
729  }
730
731  public void join() {
732    assert !isRunning() : "expected not running";
733
734    // stop the timeout executor
735    timeoutExecutor.awaitTermination();
736    // stop the work monitor executor
737    workerMonitorExecutor.awaitTermination();
738
739    // stop the worker threads
740    for (WorkerThread worker : workerThreads) {
741      worker.awaitTermination();
742    }
743    try {
744      if (!forceUpdateExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
745        LOG.warn("There are still pending tasks in forceUpdateExecutor");
746      }
747    } catch (InterruptedException e) {
748      LOG.warn("interrupted while waiting for forceUpdateExecutor termination", e);
749      Thread.currentThread().interrupt();
750    }
751    try {
752      if (!asyncTaskExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
753        LOG.warn("There are still pending tasks in asyncTaskExecutor");
754      }
755    } catch (InterruptedException e) {
756      LOG.warn("interrupted while waiting for asyncTaskExecutor termination", e);
757      Thread.currentThread().interrupt();
758    }
759
760    // log the still active threads, ThreadGroup.destroy is deprecated in JDK17 and it is not
761    // necessary for us to must destroy it here, so we just do a check and log
762    if (threadGroup.activeCount() > 0) {
763      LOG.error("There are still active thread in group {}, see STDOUT", threadGroup);
764      threadGroup.list();
765    }
766
767    // reset the in-memory state for testing
768    completed.clear();
769    rollbackStack.clear();
770    procedures.clear();
771    nonceKeysToProcIdsMap.clear();
772    scheduler.clear();
773    lastProcId.set(-1);
774  }
775
776  public void refreshConfiguration(final Configuration conf) {
777    this.conf = conf;
778    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, DEFAULT_WORKER_KEEP_ALIVE_TIME),
779      TimeUnit.MILLISECONDS);
780  }
781
782  // ==========================================================================
783  // Accessors
784  // ==========================================================================
785  public boolean isRunning() {
786    return running.get();
787  }
788
789  /** Returns the current number of worker threads. */
790  public int getWorkerThreadCount() {
791    return workerThreads.size();
792  }
793
794  /** Returns the core pool size settings. */
795  public int getCorePoolSize() {
796    return corePoolSize;
797  }
798
799  public int getActiveExecutorCount() {
800    return activeExecutorCount.get();
801  }
802
803  public TEnvironment getEnvironment() {
804    return this.environment;
805  }
806
807  public ProcedureStore getStore() {
808    return this.store;
809  }
810
811  ProcedureScheduler getScheduler() {
812    return scheduler;
813  }
814
815  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
816    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
817    this.scheduler.signalAll();
818  }
819
820  public long getKeepAliveTime(final TimeUnit timeUnit) {
821    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
822  }
823
824  // ==========================================================================
825  // Submit/Remove Chores
826  // ==========================================================================
827
828  /**
829   * Add a chore procedure to the executor
830   * @param chore the chore to add
831   */
832  public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
833    if (chore == null) {
834      return;
835    }
836    chore.setState(ProcedureState.WAITING_TIMEOUT);
837    timeoutExecutor.add(chore);
838  }
839
840  /**
841   * Remove a chore procedure from the executor
842   * @param chore the chore to remove
843   * @return whether the chore is removed, or it will be removed later
844   */
845  public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
846    if (chore == null) {
847      return true;
848    }
849    chore.setState(ProcedureState.SUCCESS);
850    return timeoutExecutor.remove(chore);
851  }
852
853  // ==========================================================================
854  // Nonce Procedure helpers
855  // ==========================================================================
856  /**
857   * Create a NonceKey from the specified nonceGroup and nonce.
858   * @param nonceGroup the group to use for the {@link NonceKey}
859   * @param nonce      the nonce to use in the {@link NonceKey}
860   * @return the generated NonceKey
861   */
862  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
863    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
864  }
865
866  /**
867   * Register a nonce for a procedure that is going to be submitted. A procId will be reserved and
868   * on submitProcedure(), the procedure with the specified nonce will take the reserved ProcId. If
869   * someone already reserved the nonce, this method will return the procId reserved, otherwise an
870   * invalid procId will be returned. and the caller should procede and submit the procedure.
871   * @param nonceKey A unique identifier for this operation from the client or process.
872   * @return the procId associated with the nonce, if any otherwise an invalid procId.
873   */
874  public long registerNonce(final NonceKey nonceKey) {
875    if (nonceKey == null) {
876      return -1;
877    }
878
879    // check if we have already a Reserved ID for the nonce
880    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
881    if (oldProcId == null) {
882      // reserve a new Procedure ID, this will be associated with the nonce
883      // and the procedure submitted with the specified nonce will use this ID.
884      final long newProcId = nextProcId();
885      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
886      if (oldProcId == null) {
887        return -1;
888      }
889    }
890
891    // we found a registered nonce, but the procedure may not have been submitted yet.
892    // since the client expect the procedure to be submitted, spin here until it is.
893    final boolean traceEnabled = LOG.isTraceEnabled();
894    while (
895      isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId))
896        && nonceKeysToProcIdsMap.containsKey(nonceKey)
897    ) {
898      if (traceEnabled) {
899        LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
900      }
901      Threads.sleep(100);
902    }
903    return oldProcId.longValue();
904  }
905
906  /**
907   * Remove the NonceKey if the procedure was not submitted to the executor.
908   * @param nonceKey A unique identifier for this operation from the client or process.
909   */
910  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
911    if (nonceKey == null) {
912      return;
913    }
914
915    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
916    if (procId == null) {
917      return;
918    }
919
920    // if the procedure was not submitted, remove the nonce
921    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
922      nonceKeysToProcIdsMap.remove(nonceKey);
923    }
924  }
925
926  /**
927   * If the failure failed before submitting it, we may want to give back the same error to the
928   * requests with the same nonceKey.
929   * @param nonceKey  A unique identifier for this operation from the client or process
930   * @param procName  name of the procedure, used to inform the user
931   * @param procOwner name of the owner of the procedure, used to inform the user
932   * @param exception the failure to report to the user
933   */
934  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
935    IOException exception) {
936    if (nonceKey == null) {
937      return;
938    }
939
940    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
941    if (procId == null || completed.containsKey(procId)) {
942      return;
943    }
944
945    completed.computeIfAbsent(procId, (key) -> {
946      Procedure<TEnvironment> proc =
947        new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
948
949      return new CompletedProcedureRetainer<>(proc);
950    });
951  }
952
953  // ==========================================================================
954  // Submit/Abort Procedure
955  // ==========================================================================
956  /**
957   * Add a new root-procedure to the executor.
958   * @param proc the new procedure to execute.
959   * @return the procedure id, that can be used to monitor the operation
960   */
961  public long submitProcedure(Procedure<TEnvironment> proc) {
962    return submitProcedure(proc, null);
963  }
964
965  /**
966   * Bypass a procedure. If the procedure is set to bypass, all the logic in execute/rollback will
967   * be ignored and it will return success, whatever. It is used to recover buggy stuck procedures,
968   * releasing the lock resources and letting other procedures run. Bypassing one procedure (and its
969   * ancestors will be bypassed automatically) may leave the cluster in a middle state, e.g. region
970   * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, the
971   * operators may have to do some clean up on hdfs or schedule some assign procedures to let region
972   * online. DO AT YOUR OWN RISK.
973   * <p>
974   * A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING,
975   * WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is
976   * executing it 3. No child procedure has been submitted
977   * <p>
978   * If all the requirements are meet, the procedure and its ancestors will be bypassed and
979   * persisted to WAL.
980   * <p>
981   * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What
982   * about WAITING_TIMEOUT?
983   * @param pids      the procedure id
984   * @param lockWait  time to wait lock
985   * @param force     if force set to true, we will bypass the procedure even if it is executing.
986   *                  This is for procedures which can't break out during executing(due to bug,
987   *                  mostly) In this case, bypassing the procedure is not enough, since it is
988   *                  already stuck there. We need to restart the master after bypassing, and
989   *                  letting the problematic procedure to execute wth bypass=true, so in that
990   *                  condition, the procedure can be successfully bypassed.
991   * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
992   * @return true if bypass success
993   * @throws IOException IOException
994   */
995  public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
996    boolean recursive) throws IOException {
997    List<Boolean> result = new ArrayList<Boolean>(pids.size());
998    for (long pid : pids) {
999      result.add(bypassProcedure(pid, lockWait, force, recursive));
1000    }
1001    return result;
1002  }
1003
1004  boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
1005    throws IOException {
1006    Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
1007    final Procedure<TEnvironment> procedure = getProcedure(pid);
1008    if (procedure == null) {
1009      LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
1010      return false;
1011    }
1012
1013    LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", procedure, lockWait,
1014      override, recursive);
1015    IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
1016    if (lockEntry == null && !override) {
1017      LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", lockWait,
1018        procedure, override);
1019      return false;
1020    } else if (lockEntry == null) {
1021      LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", lockWait,
1022        procedure, override);
1023    }
1024    try {
1025      // check whether the procedure is already finished
1026      if (procedure.isFinished()) {
1027        LOG.debug("{} is already finished, skipping bypass", procedure);
1028        return false;
1029      }
1030
1031      if (procedure.hasChildren()) {
1032        if (recursive) {
1033          // EXPENSIVE. Checks each live procedure of which there could be many!!!
1034          // Is there another way to get children of a procedure?
1035          LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
1036          this.procedures.forEachValue(1 /* Single-threaded */,
1037            // Transformer
1038            v -> v.getParentProcId() == procedure.getProcId() ? v : null,
1039            // Consumer
1040            v -> {
1041              try {
1042                bypassProcedure(v.getProcId(), lockWait, override, recursive);
1043              } catch (IOException e) {
1044                LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
1045              }
1046            });
1047        } else {
1048          LOG.debug("{} has children, skipping bypass", procedure);
1049          return false;
1050        }
1051      }
1052
1053      // If the procedure has no parent or no child, we are safe to bypass it in whatever state
1054      if (
1055        procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
1056          && procedure.getState() != ProcedureState.WAITING
1057          && procedure.getState() != ProcedureState.WAITING_TIMEOUT
1058      ) {
1059        LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
1060          + "(with no parent), {}", procedure);
1061        // Question: how is the bypass done here?
1062        return false;
1063      }
1064
1065      // Now, the procedure is not finished, and no one can execute it since we take the lock now
1066      // And we can be sure that its ancestor is not running too, since their child has not
1067      // finished yet
1068      Procedure<TEnvironment> current = procedure;
1069      while (current != null) {
1070        LOG.debug("Bypassing {}", current);
1071        current.bypass(getEnvironment());
1072        store.update(current);
1073        long parentID = current.getParentProcId();
1074        current = getProcedure(parentID);
1075      }
1076
1077      // wake up waiting procedure, already checked there is no child
1078      if (procedure.getState() == ProcedureState.WAITING) {
1079        procedure.setState(ProcedureState.RUNNABLE);
1080        store.update(procedure);
1081      }
1082
1083      // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
1084      // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
1085      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1086        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
1087        if (timeoutExecutor.remove(procedure)) {
1088          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
1089          timeoutExecutor.executeTimedoutProcedure(procedure);
1090        }
1091      } else if (lockEntry != null) {
1092        scheduler.addFront(procedure);
1093        LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
1094      } else {
1095        // If we don't have the lock, we can't re-submit the queue,
1096        // since it is already executing. To get rid of the stuck situation, we
1097        // need to restart the master. With the procedure set to bypass, the procedureExecutor
1098        // will bypass it and won't get stuck again.
1099        LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
1100          + "skipping add to queue", procedure);
1101      }
1102      return true;
1103
1104    } finally {
1105      if (lockEntry != null) {
1106        procExecutionLock.releaseLockEntry(lockEntry);
1107      }
1108    }
1109  }
1110
1111  /**
1112   * Add a new root-procedure to the executor.
1113   * @param proc     the new procedure to execute.
1114   * @param nonceKey the registered unique identifier for this operation from the client or process.
1115   * @return the procedure id, that can be used to monitor the operation
1116   */
1117  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
1118      justification = "FindBugs is blind to the check-for-null")
1119  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
1120    Preconditions.checkArgument(lastProcId.get() >= 0);
1121
1122    prepareProcedure(proc);
1123
1124    final Long currentProcId;
1125    if (nonceKey != null) {
1126      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
1127      Preconditions.checkArgument(currentProcId != null,
1128        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
1129    } else {
1130      currentProcId = nextProcId();
1131    }
1132
1133    // Initialize the procedure
1134    proc.setNonceKey(nonceKey);
1135    proc.setProcId(currentProcId.longValue());
1136
1137    // Commit the transaction
1138    store.insert(proc, null);
1139    LOG.debug("Stored {}", proc);
1140
1141    // Add the procedure to the executor
1142    return pushProcedure(proc);
1143  }
1144
1145  /**
1146   * Add a set of new root-procedure to the executor.
1147   * @param procs the new procedures to execute.
1148   */
1149  // TODO: Do we need to take nonces here?
1150  public void submitProcedures(Procedure<TEnvironment>[] procs) {
1151    Preconditions.checkArgument(lastProcId.get() >= 0);
1152    if (procs == null || procs.length <= 0) {
1153      return;
1154    }
1155
1156    // Prepare procedure
1157    for (int i = 0; i < procs.length; ++i) {
1158      prepareProcedure(procs[i]).setProcId(nextProcId());
1159    }
1160
1161    // Commit the transaction
1162    store.insert(procs);
1163    if (LOG.isDebugEnabled()) {
1164      LOG.debug("Stored " + Arrays.toString(procs));
1165    }
1166
1167    // Add the procedure to the executor
1168    for (int i = 0; i < procs.length; ++i) {
1169      pushProcedure(procs[i]);
1170    }
1171  }
1172
1173  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
1174    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
1175    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
1176    if (this.checkOwnerSet) {
1177      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
1178    }
1179    return proc;
1180  }
1181
1182  private long pushProcedure(Procedure<TEnvironment> proc) {
1183    final long currentProcId = proc.getProcId();
1184
1185    // Update metrics on start of a procedure
1186    proc.updateMetricsOnSubmit(getEnvironment());
1187
1188    // Create the rollback stack for the procedure
1189    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
1190    stack.setRollbackSupported(proc.isRollbackSupported());
1191    rollbackStack.put(currentProcId, stack);
1192
1193    // Submit the new subprocedures
1194    assert !procedures.containsKey(currentProcId);
1195    procedures.put(currentProcId, proc);
1196    sendProcedureAddedNotification(currentProcId);
1197    scheduler.addBack(proc);
1198    return proc.getProcId();
1199  }
1200
1201  /**
1202   * Send an abort notification the specified procedure. Depending on the procedure implementation
1203   * the abort can be considered or ignored.
1204   * @param procId the procedure to abort
1205   * @return true if the procedure exists and has received the abort, otherwise false.
1206   */
1207  public boolean abort(long procId) {
1208    return abort(procId, true);
1209  }
1210
1211  /**
1212   * Send an abort notification to the specified procedure. Depending on the procedure
1213   * implementation, the abort can be considered or ignored.
1214   * @param procId                the procedure to abort
1215   * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
1216   * @return true if the procedure exists and has received the abort, otherwise false.
1217   */
1218  public boolean abort(long procId, boolean mayInterruptIfRunning) {
1219    Procedure<TEnvironment> proc = procedures.get(procId);
1220    if (proc != null) {
1221      if (!mayInterruptIfRunning && proc.wasExecuted()) {
1222        return false;
1223      }
1224      return proc.abort(getEnvironment());
1225    }
1226    return false;
1227  }
1228
1229  // ==========================================================================
1230  // Executor query helpers
1231  // ==========================================================================
1232  public Procedure<TEnvironment> getProcedure(final long procId) {
1233    return procedures.get(procId);
1234  }
1235
1236  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
1237    Procedure<TEnvironment> proc = getProcedure(procId);
1238    if (clazz.isInstance(proc)) {
1239      return clazz.cast(proc);
1240    }
1241    return null;
1242  }
1243
1244  public Procedure<TEnvironment> getResult(long procId) {
1245    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1246    if (retainer == null) {
1247      return null;
1248    } else {
1249      return retainer.getProcedure();
1250    }
1251  }
1252
1253  /**
1254   * Return true if the procedure is finished. The state may be "completed successfully" or "failed
1255   * and rolledback". Use getResult() to check the state or get the result data.
1256   * @param procId the ID of the procedure to check
1257   * @return true if the procedure execution is finished, otherwise false.
1258   */
1259  public boolean isFinished(final long procId) {
1260    return !procedures.containsKey(procId);
1261  }
1262
1263  /**
1264   * Return true if the procedure is started.
1265   * @param procId the ID of the procedure to check
1266   * @return true if the procedure execution is started, otherwise false.
1267   */
1268  public boolean isStarted(long procId) {
1269    Procedure<?> proc = procedures.get(procId);
1270    if (proc == null) {
1271      return completed.get(procId) != null;
1272    }
1273    return proc.wasExecuted();
1274  }
1275
1276  /**
1277   * Mark the specified completed procedure, as ready to remove.
1278   * @param procId the ID of the procedure to remove
1279   */
1280  public void removeResult(long procId) {
1281    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1282    if (retainer == null) {
1283      assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
1284      LOG.debug("pid={} already removed by the cleaner.", procId);
1285      return;
1286    }
1287
1288    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
1289    retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
1290  }
1291
1292  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
1293    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1294    if (retainer == null) {
1295      return procedures.get(procId);
1296    } else {
1297      return retainer.getProcedure();
1298    }
1299  }
1300
1301  /**
1302   * Check if the user is this procedure's owner
1303   * @param procId the target procedure
1304   * @param user   the user
1305   * @return true if the user is the owner of the procedure, false otherwise or the owner is
1306   *         unknown.
1307   */
1308  public boolean isProcedureOwner(long procId, User user) {
1309    if (user == null) {
1310      return false;
1311    }
1312    final Procedure<TEnvironment> runningProc = procedures.get(procId);
1313    if (runningProc != null) {
1314      return runningProc.getOwner().equals(user.getShortName());
1315    }
1316
1317    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1318    if (retainer != null) {
1319      return retainer.getProcedure().getOwner().equals(user.getShortName());
1320    }
1321
1322    // Procedure either does not exist or has already completed and got cleaned up.
1323    // At this time, we cannot check the owner of the procedure
1324    return false;
1325  }
1326
1327  /**
1328   * Should only be used when starting up, where the procedure workers have not been started.
1329   * <p/>
1330   * If the procedure works has been started, the return values maybe changed when you are
1331   * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
1332   * it will do a copy, and also include the finished procedures.
1333   */
1334  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
1335    return procedures.values();
1336  }
1337
1338  /**
1339   * Get procedures.
1340   * @return the procedures in a list
1341   */
1342  public List<Procedure<TEnvironment>> getProcedures() {
1343    List<Procedure<TEnvironment>> procedureList =
1344      new ArrayList<>(procedures.size() + completed.size());
1345    procedureList.addAll(procedures.values());
1346    // Note: The procedure could show up twice in the list with different state, as
1347    // it could complete after we walk through procedures list and insert into
1348    // procedureList - it is ok, as we will use the information in the Procedure
1349    // to figure it out; to prevent this would increase the complexity of the logic.
1350    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
1351      .forEach(procedureList::add);
1352    return procedureList;
1353  }
1354
1355  // ==========================================================================
1356  // Listeners helpers
1357  // ==========================================================================
1358  public void registerListener(ProcedureExecutorListener listener) {
1359    this.listeners.add(listener);
1360  }
1361
1362  public boolean unregisterListener(ProcedureExecutorListener listener) {
1363    return this.listeners.remove(listener);
1364  }
1365
1366  private void sendProcedureLoadedNotification(final long procId) {
1367    if (!this.listeners.isEmpty()) {
1368      for (ProcedureExecutorListener listener : this.listeners) {
1369        try {
1370          listener.procedureLoaded(procId);
1371        } catch (Throwable e) {
1372          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1373        }
1374      }
1375    }
1376  }
1377
1378  private void sendProcedureAddedNotification(final long procId) {
1379    if (!this.listeners.isEmpty()) {
1380      for (ProcedureExecutorListener listener : this.listeners) {
1381        try {
1382          listener.procedureAdded(procId);
1383        } catch (Throwable e) {
1384          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1385        }
1386      }
1387    }
1388  }
1389
1390  private void sendProcedureFinishedNotification(final long procId) {
1391    if (!this.listeners.isEmpty()) {
1392      for (ProcedureExecutorListener listener : this.listeners) {
1393        try {
1394          listener.procedureFinished(procId);
1395        } catch (Throwable e) {
1396          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1397        }
1398      }
1399    }
1400  }
1401
1402  // ==========================================================================
1403  // Procedure IDs helpers
1404  // ==========================================================================
1405  private long nextProcId() {
1406    long procId = lastProcId.incrementAndGet();
1407    if (procId < 0) {
1408      while (!lastProcId.compareAndSet(procId, 0)) {
1409        procId = lastProcId.get();
1410        if (procId >= 0) {
1411          break;
1412        }
1413      }
1414      while (procedures.containsKey(procId)) {
1415        procId = lastProcId.incrementAndGet();
1416      }
1417    }
1418    assert procId >= 0 : "Invalid procId " + procId;
1419    return procId;
1420  }
1421
1422  protected long getLastProcId() {
1423    return lastProcId.get();
1424  }
1425
1426  public Set<Long> getActiveProcIds() {
1427    return procedures.keySet();
1428  }
1429
1430  Long getRootProcedureId(Procedure<TEnvironment> proc) {
1431    return Procedure.getRootProcedureId(procedures, proc);
1432  }
1433
1434  // ==========================================================================
1435  // Executions
1436  // ==========================================================================
1437  private void executeProcedure(Procedure<TEnvironment> proc) {
1438    if (proc.isFinished()) {
1439      LOG.debug("{} is already finished, skipping execution", proc);
1440      return;
1441    }
1442    final Long rootProcId = getRootProcedureId(proc);
1443    if (rootProcId == null) {
1444      // The 'proc' was ready to run but the root procedure was rolledback
1445      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
1446      executeRollback(proc);
1447      return;
1448    }
1449
1450    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
1451    if (procStack == null) {
1452      LOG.warn("RootProcedureState is null for " + proc.getProcId());
1453      return;
1454    }
1455    do {
1456      // Try to acquire the execution
1457      if (!procStack.acquire(proc)) {
1458        if (procStack.setRollback()) {
1459          // we have the 'rollback-lock' we can start rollingback
1460          switch (executeRollback(rootProcId, procStack)) {
1461            case LOCK_ACQUIRED:
1462              break;
1463            case LOCK_YIELD_WAIT:
1464              procStack.unsetRollback();
1465              scheduler.yield(proc);
1466              break;
1467            case LOCK_EVENT_WAIT:
1468              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
1469              procStack.unsetRollback();
1470              break;
1471            default:
1472              throw new UnsupportedOperationException();
1473          }
1474        } else {
1475          // if we can't rollback means that some child is still running.
1476          // the rollback will be executed after all the children are done.
1477          // If the procedure was never executed, remove and mark it as rolledback.
1478          if (!proc.wasExecuted()) {
1479            switch (executeRollback(proc)) {
1480              case LOCK_ACQUIRED:
1481                break;
1482              case LOCK_YIELD_WAIT:
1483                scheduler.yield(proc);
1484                break;
1485              case LOCK_EVENT_WAIT:
1486                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
1487                break;
1488              default:
1489                throw new UnsupportedOperationException();
1490            }
1491          }
1492        }
1493        break;
1494      }
1495
1496      // Execute the procedure
1497      assert proc.getState() == ProcedureState.RUNNABLE : proc;
1498      // Note that lock is NOT about concurrency but rather about ensuring
1499      // ownership of a procedure of an entity such as a region or table
1500      LockState lockState = acquireLock(proc);
1501      switch (lockState) {
1502        case LOCK_ACQUIRED:
1503          execProcedure(procStack, proc);
1504          break;
1505        case LOCK_YIELD_WAIT:
1506          LOG.info(lockState + " " + proc);
1507          scheduler.yield(proc);
1508          break;
1509        case LOCK_EVENT_WAIT:
1510          // Someone will wake us up when the lock is available
1511          LOG.debug(lockState + " " + proc);
1512          break;
1513        default:
1514          throw new UnsupportedOperationException();
1515      }
1516      procStack.release(proc);
1517
1518      if (proc.isSuccess()) {
1519        // update metrics on finishing the procedure
1520        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
1521        LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
1522        // Finalize the procedure state
1523        if (proc.getProcId() == rootProcId) {
1524          rootProcedureFinished(proc);
1525        } else {
1526          execCompletionCleanup(proc);
1527        }
1528        break;
1529      }
1530    } while (procStack.isFailed());
1531  }
1532
1533  private LockState acquireLock(Procedure<TEnvironment> proc) {
1534    TEnvironment env = getEnvironment();
1535    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
1536    // hasLock is true.
1537    if (proc.hasLock()) {
1538      return LockState.LOCK_ACQUIRED;
1539    }
1540    return proc.doAcquireLock(env, store);
1541  }
1542
1543  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
1544    TEnvironment env = getEnvironment();
1545    // For how the framework works, we know that we will always have the lock
1546    // when we call releaseLock(), so we can avoid calling proc.hasLock()
1547    if (force || !proc.holdLock(env) || proc.isFinished()) {
1548      proc.doReleaseLock(env, store);
1549    }
1550  }
1551
1552  // Returning null means we have already held the execution lock, so you do not need to get the
1553  // lock entry for releasing
1554  private IdLock.Entry getLockEntryForRollback(long procId) {
1555    // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
1556    // this check, as the worker will hold the lock before executing a procedure. This is the only
1557    // place where we may hold two procedure execution locks, and there is a fence in the
1558    // RootProcedureState where we can make sure that only one worker can execute the rollback of
1559    // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
1560    // prevent race between us and the force update thread.
1561    if (!procExecutionLock.isHeldByCurrentThread(procId)) {
1562      try {
1563        return procExecutionLock.getLockEntry(procId);
1564      } catch (IOException e) {
1565        // can only happen if interrupted, so not a big deal to propagate it
1566        throw new UncheckedIOException(e);
1567      }
1568    }
1569    return null;
1570  }
1571
1572  private void executeUnexpectedRollback(Procedure<TEnvironment> rootProc,
1573    RootProcedureState<TEnvironment> procStack) {
1574    if (procStack.getSubprocs() != null) {
1575      // comparing proc id in reverse order, so we will delete later procedures first, otherwise we
1576      // may delete parent procedure first and if we fail in the middle of this operation, when
1577      // loading we will find some orphan procedures
1578      PriorityQueue<Procedure<TEnvironment>> pq =
1579        new PriorityQueue<>(procStack.getSubprocs().size(),
1580          Comparator.<Procedure<TEnvironment>> comparingLong(Procedure::getProcId).reversed());
1581      pq.addAll(procStack.getSubprocs());
1582      for (;;) {
1583        Procedure<TEnvironment> subproc = pq.poll();
1584        if (subproc == null) {
1585          break;
1586        }
1587        if (!procedures.containsKey(subproc.getProcId())) {
1588          // this means it has already been rolledback
1589          continue;
1590        }
1591        IdLock.Entry lockEntry = getLockEntryForRollback(subproc.getProcId());
1592        try {
1593          cleanupAfterRollbackOneStep(subproc);
1594          execCompletionCleanup(subproc);
1595        } finally {
1596          if (lockEntry != null) {
1597            procExecutionLock.releaseLockEntry(lockEntry);
1598          }
1599        }
1600      }
1601    }
1602    IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId());
1603    try {
1604      cleanupAfterRollbackOneStep(rootProc);
1605    } finally {
1606      if (lockEntry != null) {
1607        procExecutionLock.releaseLockEntry(lockEntry);
1608      }
1609    }
1610  }
1611
1612  private LockState executeNormalRollback(Procedure<TEnvironment> rootProc,
1613    RootProcedureState<TEnvironment> procStack) {
1614    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
1615    assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1616
1617    int stackTail = subprocStack.size();
1618    while (stackTail-- > 0) {
1619      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
1620      IdLock.Entry lockEntry = getLockEntryForRollback(proc.getProcId());
1621      try {
1622        // For the sub procedures which are successfully finished, we do not rollback them.
1623        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
1624        // recursively rollback its ancestors. The state changes which are done by sub procedures
1625        // should be handled by parent procedures when rolling back. For example, when rolling back
1626        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
1627        // online, instead of rolling back the original procedures which offlined the regions(in
1628        // fact these procedures can not be rolled back...).
1629        if (proc.isSuccess()) {
1630          // Just do the cleanup work, without actually executing the rollback
1631          subprocStack.remove(stackTail);
1632          cleanupAfterRollbackOneStep(proc);
1633          continue;
1634        }
1635        LockState lockState = acquireLock(proc);
1636        if (lockState != LockState.LOCK_ACQUIRED) {
1637          // can't take a lock on the procedure, add the root-proc back on the
1638          // queue waiting for the lock availability
1639          return lockState;
1640        }
1641
1642        lockState = executeRollback(proc);
1643        releaseLock(proc, false);
1644        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
1645        abortRollback |= !isRunning() || !store.isRunning();
1646
1647        // allows to kill the executor before something is stored to the wal.
1648        // useful to test the procedure recovery.
1649        if (abortRollback) {
1650          return lockState;
1651        }
1652
1653        subprocStack.remove(stackTail);
1654
1655        // if the procedure is kind enough to pass the slot to someone else, yield
1656        // if the proc is already finished, do not yield
1657        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
1658          return LockState.LOCK_YIELD_WAIT;
1659        }
1660
1661        if (proc != rootProc) {
1662          execCompletionCleanup(proc);
1663        }
1664      } finally {
1665        if (lockEntry != null) {
1666          procExecutionLock.releaseLockEntry(lockEntry);
1667        }
1668      }
1669    }
1670    return LockState.LOCK_ACQUIRED;
1671  }
1672
1673  /**
1674   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
1675   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
1676   */
1677  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
1678    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
1679    RemoteProcedureException exception = rootProc.getException();
1680    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
1681    // rolling back because the subprocedure does. Clarify.
1682    if (exception == null) {
1683      exception = procStack.getException();
1684      rootProc.setFailure(exception);
1685      store.update(rootProc);
1686    }
1687
1688    if (procStack.isRollbackSupported()) {
1689      LockState lockState = executeNormalRollback(rootProc, procStack);
1690      if (lockState != LockState.LOCK_ACQUIRED) {
1691        return lockState;
1692      }
1693    } else {
1694      // the procedure does not support rollback, so typically we should not reach here, this
1695      // usually means there are code bugs, let's just wait all the subprocedures to finish and then
1696      // mark the root procedure as failure.
1697      LOG.error(HBaseMarkers.FATAL,
1698        "Root Procedure {} does not support rollback but the execution failed"
1699          + " and try to rollback, code bug?",
1700        rootProc, exception);
1701      executeUnexpectedRollback(rootProc, procStack);
1702    }
1703
1704    IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId());
1705    try {
1706      // Finalize the procedure state
1707      LOG.info("Rolled back {} exec-time={}", rootProc,
1708        StringUtils.humanTimeDiff(rootProc.elapsedTime()));
1709      rootProcedureFinished(rootProc);
1710    } finally {
1711      if (lockEntry != null) {
1712        procExecutionLock.releaseLockEntry(lockEntry);
1713      }
1714    }
1715
1716    return LockState.LOCK_ACQUIRED;
1717  }
1718
1719  private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
1720    if (testing != null && testing.shouldKillBeforeStoreUpdateInRollback()) {
1721      kill("TESTING: Kill BEFORE store update in rollback: " + proc);
1722    }
1723    if (proc.removeStackIndex()) {
1724      if (!proc.isSuccess()) {
1725        proc.setState(ProcedureState.ROLLEDBACK);
1726      }
1727
1728      // update metrics on finishing the procedure (fail)
1729      proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
1730
1731      if (proc.hasParent()) {
1732        store.delete(proc.getProcId());
1733        procedures.remove(proc.getProcId());
1734      } else {
1735        final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
1736        if (childProcIds != null) {
1737          store.delete(proc, childProcIds);
1738        } else {
1739          store.update(proc);
1740        }
1741      }
1742    } else {
1743      store.update(proc);
1744    }
1745  }
1746
1747  /**
1748   * Execute the rollback of the procedure step. It updates the store with the new state (stack
1749   * index) or will remove completly the procedure in case it is a child.
1750   */
1751  private LockState executeRollback(Procedure<TEnvironment> proc) {
1752    try {
1753      proc.doRollback(getEnvironment());
1754    } catch (IOException e) {
1755      LOG.debug("Roll back attempt failed for {}", proc, e);
1756      return LockState.LOCK_YIELD_WAIT;
1757    } catch (InterruptedException e) {
1758      handleInterruptedException(proc, e);
1759      return LockState.LOCK_YIELD_WAIT;
1760    } catch (Throwable e) {
1761      // Catch NullPointerExceptions or similar errors...
1762      LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
1763    }
1764
1765    cleanupAfterRollbackOneStep(proc);
1766
1767    return LockState.LOCK_ACQUIRED;
1768  }
1769
1770  private void yieldProcedure(Procedure<TEnvironment> proc) {
1771    releaseLock(proc, false);
1772    scheduler.yield(proc);
1773  }
1774
1775  /**
1776   * Executes <code>procedure</code>
1777   * <ul>
1778   * <li>Calls the doExecute() of the procedure
1779   * <li>If the procedure execution didn't fail (i.e. valid user input)
1780   * <ul>
1781   * <li>...and returned subprocedures
1782   * <ul>
1783   * <li>The subprocedures are initialized.
1784   * <li>The subprocedures are added to the store
1785   * <li>The subprocedures are added to the runnable queue
1786   * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
1787   * </ul>
1788   * </li>
1789   * <li>...if there are no subprocedure
1790   * <ul>
1791   * <li>the procedure completed successfully
1792   * <li>if there is a parent (WAITING)
1793   * <li>the parent state will be set to RUNNABLE
1794   * </ul>
1795   * </li>
1796   * </ul>
1797   * </li>
1798   * <li>In case of failure
1799   * <ul>
1800   * <li>The store is updated with the new state</li>
1801   * <li>The executor (caller of this method) will start the rollback of the procedure</li>
1802   * </ul>
1803   * </li>
1804   * </ul>
1805   */
1806  private void execProcedure(RootProcedureState<TEnvironment> procStack,
1807    Procedure<TEnvironment> procedure) {
1808    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
1809      "NOT RUNNABLE! " + procedure.toString());
1810
1811    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
1812    // The exception is caught below and then we hurry to the exit without disturbing state. The
1813    // idea is that the processing of this procedure will be unsuspended later by an external event
1814    // such the report of a region open.
1815    boolean suspended = false;
1816
1817    // Whether to 're-' -execute; run through the loop again.
1818    boolean reExecute = false;
1819
1820    Procedure<TEnvironment>[] subprocs = null;
1821    do {
1822      reExecute = false;
1823      procedure.resetPersistence();
1824      try {
1825        subprocs = procedure.doExecute(getEnvironment());
1826        if (subprocs != null && subprocs.length == 0) {
1827          subprocs = null;
1828        }
1829      } catch (ProcedureSuspendedException e) {
1830        LOG.trace("Suspend {}", procedure);
1831        suspended = true;
1832      } catch (ProcedureYieldException e) {
1833        LOG.trace("Yield {}", procedure, e);
1834        yieldProcedure(procedure);
1835        return;
1836      } catch (InterruptedException e) {
1837        LOG.trace("Yield interrupt {}", procedure, e);
1838        handleInterruptedException(procedure, e);
1839        yieldProcedure(procedure);
1840        return;
1841      } catch (Throwable e) {
1842        // Catch NullPointerExceptions or similar errors...
1843        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
1844        LOG.error(msg, e);
1845        procedure.setFailure(new RemoteProcedureException(msg, e));
1846      }
1847
1848      if (!procedure.isFailed()) {
1849        if (subprocs != null) {
1850          if (subprocs.length == 1 && subprocs[0] == procedure) {
1851            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
1852            // i.e. we go around this loop again rather than go back out on the scheduler queue.
1853            subprocs = null;
1854            reExecute = true;
1855            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
1856          } else {
1857            // Yield the current procedure, and make the subprocedure runnable
1858            // subprocs may come back 'null'.
1859            subprocs = initializeChildren(procStack, procedure, subprocs);
1860            LOG.info("Initialized subprocedures=" + (subprocs == null
1861              ? null
1862              : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList())
1863                .toString()));
1864          }
1865        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1866          LOG.trace("Added to timeoutExecutor {}", procedure);
1867          timeoutExecutor.add(procedure);
1868        } else if (!suspended) {
1869          // No subtask, so we are done
1870          procedure.setState(ProcedureState.SUCCESS);
1871        }
1872      }
1873
1874      // allows to kill the executor before something is stored to the wal.
1875      // useful to test the procedure recovery.
1876      if (
1877        testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())
1878      ) {
1879        kill("TESTING: Kill BEFORE store update: " + procedure);
1880      }
1881
1882      // TODO: The code here doesn't check if store is running before persisting to the store as
1883      // it relies on the method call below to throw RuntimeException to wind up the stack and
1884      // executor thread to stop. The statement following the method call below seems to check if
1885      // store is not running, to prevent scheduling children procedures, re-execution or yield
1886      // of this procedure. This may need more scrutiny and subsequent cleanup in future
1887      //
1888      // Commit the transaction even if a suspend (state may have changed). Note this append
1889      // can take a bunch of time to complete.
1890      if (procedure.needPersistence()) {
1891        // Add the procedure to the stack
1892        // See HBASE-28210 on why we need synchronized here
1893        boolean needUpdateStoreOutsideLock = false;
1894        synchronized (procStack) {
1895          if (procStack.addRollbackStep(procedure)) {
1896            updateStoreOnExec(procStack, procedure, subprocs);
1897          } else {
1898            needUpdateStoreOutsideLock = true;
1899          }
1900        }
1901        // this is an optimization if we do not need to maintain rollback step, as all subprocedures
1902        // of the same root procedure share the same root procedure state, if we can only update
1903        // store under the above lock, the sub procedures of the same root procedure can only be
1904        // persistent sequentially, which will have a bad performance. See HBASE-28212 for more
1905        // details.
1906        if (needUpdateStoreOutsideLock) {
1907          updateStoreOnExec(procStack, procedure, subprocs);
1908        }
1909      }
1910
1911      // if the store is not running we are aborting
1912      if (!store.isRunning()) {
1913        return;
1914      }
1915      // if the procedure is kind enough to pass the slot to someone else, yield
1916      if (
1917        procedure.isRunnable() && !suspended
1918          && procedure.isYieldAfterExecutionStep(getEnvironment())
1919      ) {
1920        yieldProcedure(procedure);
1921        return;
1922      }
1923
1924      assert (reExecute && subprocs == null) || !reExecute;
1925    } while (reExecute);
1926
1927    // Allows to kill the executor after something is stored to the WAL but before the below
1928    // state settings are done -- in particular the one on the end where we make parent
1929    // RUNNABLE again when its children are done; see countDownChildren.
1930    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
1931      kill("TESTING: Kill AFTER store update: " + procedure);
1932    }
1933
1934    // Submit the new subprocedures
1935    if (subprocs != null && !procedure.isFailed()) {
1936      submitChildrenProcedures(subprocs);
1937    }
1938
1939    // we need to log the release lock operation before waking up the parent procedure, as there
1940    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
1941    // the sub procedures from store and cause problems...
1942    releaseLock(procedure, false);
1943
1944    // if the procedure is complete and has a parent, count down the children latch.
1945    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
1946    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
1947      countDownChildren(procStack, procedure);
1948    }
1949  }
1950
1951  private void kill(String msg) {
1952    LOG.debug(msg);
1953    stop();
1954    throw new RuntimeException(msg);
1955  }
1956
1957  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
1958    Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1959    assert subprocs != null : "expected subprocedures";
1960    final long rootProcId = getRootProcedureId(procedure);
1961    for (int i = 0; i < subprocs.length; ++i) {
1962      Procedure<TEnvironment> subproc = subprocs[i];
1963      if (subproc == null) {
1964        String msg = "subproc[" + i + "] is null, aborting the procedure";
1965        procedure
1966          .setFailure(new RemoteProcedureException(msg, new IllegalArgumentIOException(msg)));
1967        return null;
1968      }
1969
1970      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
1971      subproc.setParentProcId(procedure.getProcId());
1972      subproc.setRootProcId(rootProcId);
1973      subproc.setProcId(nextProcId());
1974      procStack.addSubProcedure(subproc);
1975    }
1976
1977    if (!procedure.isFailed()) {
1978      procedure.setChildrenLatch(subprocs.length);
1979      switch (procedure.getState()) {
1980        case RUNNABLE:
1981          procedure.setState(ProcedureState.WAITING);
1982          break;
1983        case WAITING_TIMEOUT:
1984          timeoutExecutor.add(procedure);
1985          break;
1986        default:
1987          break;
1988      }
1989    }
1990    return subprocs;
1991  }
1992
1993  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
1994    for (int i = 0; i < subprocs.length; ++i) {
1995      Procedure<TEnvironment> subproc = subprocs[i];
1996      subproc.updateMetricsOnSubmit(getEnvironment());
1997      assert !procedures.containsKey(subproc.getProcId());
1998      procedures.put(subproc.getProcId(), subproc);
1999      scheduler.addFront(subproc);
2000    }
2001  }
2002
2003  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
2004    Procedure<TEnvironment> procedure) {
2005    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
2006    if (parent == null) {
2007      assert procStack.isRollingback();
2008      return;
2009    }
2010
2011    // If this procedure is the last child awake the parent procedure
2012    if (parent.tryRunnable()) {
2013      // If we succeeded in making the parent runnable -- i.e. all of its
2014      // children have completed, move parent to front of the queue.
2015      store.update(parent);
2016      scheduler.addFront(parent);
2017      LOG.info("Finished subprocedure pid={}, resume processing ppid={}", procedure.getProcId(),
2018        parent.getProcId());
2019      return;
2020    }
2021  }
2022
2023  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
2024    Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
2025    if (subprocs != null && !procedure.isFailed()) {
2026      if (LOG.isTraceEnabled()) {
2027        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
2028      }
2029      store.insert(procedure, subprocs);
2030    } else {
2031      LOG.trace("Store update {}", procedure);
2032      if (procedure.isFinished() && !procedure.hasParent()) {
2033        // remove child procedures
2034        final long[] childProcIds = procStack.getSubprocedureIds();
2035        if (childProcIds != null) {
2036          store.delete(procedure, childProcIds);
2037          for (int i = 0; i < childProcIds.length; ++i) {
2038            procedures.remove(childProcIds[i]);
2039          }
2040        } else {
2041          store.update(procedure);
2042        }
2043      } else {
2044        store.update(procedure);
2045      }
2046    }
2047  }
2048
2049  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
2050    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
2051    // NOTE: We don't call Thread.currentThread().interrupt()
2052    // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
2053    // the InterruptedException. If the master is going down, we will be notified
2054    // and the executor/store will be stopped.
2055    // (The interrupted procedure will be retried on the next run)
2056  }
2057
2058  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
2059    final TEnvironment env = getEnvironment();
2060    if (proc.hasLock()) {
2061      LOG.warn("Usually this should not happen, we will release the lock before if the procedure"
2062        + " is finished, even if the holdLock is true, arrive here means we have some holes where"
2063        + " we do not release the lock. And the releaseLock below may fail since the procedure may"
2064        + " have already been deleted from the procedure store.");
2065      releaseLock(proc, true);
2066    }
2067    try {
2068      proc.completionCleanup(env);
2069    } catch (Throwable e) {
2070      // Catch NullPointerExceptions or similar errors...
2071      LOG.error("CODE-BUG: uncaught runtime exception for procedure: " + proc, e);
2072    }
2073
2074    // call schedulers completion cleanup, we have some special clean up logic in this method so if
2075    // it throws any exceptions, we can not just ignore it like the above procedure's cleanup
2076    scheduler.completionCleanup(proc);
2077  }
2078
2079  private void rootProcedureFinished(Procedure<TEnvironment> proc) {
2080    // call the procedure completion cleanup handler
2081    execCompletionCleanup(proc);
2082
2083    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
2084
2085    // update the executor internal state maps
2086    if (!proc.shouldWaitClientAck(getEnvironment())) {
2087      retainer.setClientAckTime(0);
2088    }
2089
2090    completed.put(proc.getProcId(), retainer);
2091    rollbackStack.remove(proc.getProcId());
2092    procedures.remove(proc.getProcId());
2093
2094    // Notify the listeners
2095    sendProcedureFinishedNotification(proc.getProcId());
2096  }
2097
2098  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
2099    return rollbackStack.get(rootProcId);
2100  }
2101
2102  ProcedureScheduler getProcedureScheduler() {
2103    return scheduler;
2104  }
2105
2106  int getCompletedSize() {
2107    return completed.size();
2108  }
2109
2110  public IdLock getProcExecutionLock() {
2111    return procExecutionLock;
2112  }
2113
2114  /**
2115   * Get a thread pool for executing some asynchronous tasks
2116   */
2117  public ExecutorService getAsyncTaskExecutor() {
2118    return asyncTaskExecutor;
2119  }
2120
2121  // ==========================================================================
2122  // Worker Thread
2123  // ==========================================================================
2124  private class WorkerThread extends StoppableThread {
2125    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
2126    private volatile Procedure<TEnvironment> activeProcedure;
2127
2128    public WorkerThread(ThreadGroup group) {
2129      this(group, "PEWorker-");
2130    }
2131
2132    protected WorkerThread(ThreadGroup group, String prefix) {
2133      super(group, prefix + workerId.incrementAndGet());
2134      setDaemon(true);
2135    }
2136
2137    @Override
2138    public void sendStopSignal() {
2139      scheduler.signalAll();
2140    }
2141
2142    /**
2143     * Encapsulates execution of the current {@link #activeProcedure} for easy tracing.
2144     */
2145    private long runProcedure() throws IOException {
2146      final Procedure<TEnvironment> proc = this.activeProcedure;
2147      int activeCount = activeExecutorCount.incrementAndGet();
2148      int runningCount = store.setRunningProcedureCount(activeCount);
2149      LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount,
2150        activeCount);
2151      executionStartTime.set(EnvironmentEdgeManager.currentTime());
2152      IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
2153      try {
2154        executeProcedure(proc);
2155      } catch (AssertionError e) {
2156        LOG.info("ASSERT pid=" + proc.getProcId(), e);
2157        throw e;
2158      } finally {
2159        procExecutionLock.releaseLockEntry(lockEntry);
2160        activeCount = activeExecutorCount.decrementAndGet();
2161        runningCount = store.setRunningProcedureCount(activeCount);
2162        LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount,
2163          activeCount);
2164        this.activeProcedure = null;
2165        executionStartTime.set(Long.MAX_VALUE);
2166      }
2167      return EnvironmentEdgeManager.currentTime();
2168    }
2169
2170    @Override
2171    public void run() {
2172      long lastUpdate = EnvironmentEdgeManager.currentTime();
2173      try {
2174        while (isRunning() && keepAlive(lastUpdate)) {
2175          @SuppressWarnings("unchecked")
2176          Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
2177          if (proc == null) {
2178            continue;
2179          }
2180          this.activeProcedure = proc;
2181          lastUpdate = TraceUtil.trace(this::runProcedure, new ProcedureSpanBuilder(proc));
2182        }
2183      } catch (Throwable t) {
2184        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
2185      } finally {
2186        LOG.trace("Worker terminated.");
2187      }
2188      workerThreads.remove(this);
2189    }
2190
2191    @Override
2192    public String toString() {
2193      Procedure<?> p = this.activeProcedure;
2194      return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")");
2195    }
2196
2197    /** Returns the time since the current procedure is running */
2198    public long getCurrentRunTime() {
2199      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
2200    }
2201
2202    // core worker never timeout
2203    protected boolean keepAlive(long lastUpdate) {
2204      return true;
2205    }
2206  }
2207
2208  // A worker thread which can be added when core workers are stuck. Will timeout after
2209  // keepAliveTime if there is no procedure to run.
2210  private final class KeepAliveWorkerThread extends WorkerThread {
2211    public KeepAliveWorkerThread(ThreadGroup group) {
2212      super(group, "KeepAlivePEWorker-");
2213    }
2214
2215    @Override
2216    protected boolean keepAlive(long lastUpdate) {
2217      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
2218    }
2219  }
2220
2221  // ----------------------------------------------------------------------------
2222  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
2223  // full set of procedures pending and completed to write a compacted
2224  // version of the log (in case is a log)?
2225  // In theory no, procedures are have a short life, so at some point the store
2226  // will have the tracker saying everything is in the last log.
2227  // ----------------------------------------------------------------------------
2228
2229  private final class WorkerMonitor extends InlineChore {
2230    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
2231      "hbase.procedure.worker.monitor.interval.msec";
2232    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
2233
2234    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
2235      "hbase.procedure.worker.stuck.threshold.msec";
2236    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
2237
2238    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
2239      "hbase.procedure.worker.add.stuck.percentage";
2240    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
2241
2242    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
2243    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
2244    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
2245
2246    public WorkerMonitor() {
2247      refreshConfig();
2248    }
2249
2250    @Override
2251    public void run() {
2252      final int stuckCount = checkForStuckWorkers();
2253      checkThreadCount(stuckCount);
2254
2255      // refresh interval (poor man dynamic conf update)
2256      refreshConfig();
2257    }
2258
2259    private int checkForStuckWorkers() {
2260      // check if any of the worker is stuck
2261      int stuckCount = 0;
2262      for (WorkerThread worker : workerThreads) {
2263        if (worker.getCurrentRunTime() < stuckThreshold) {
2264          continue;
2265        }
2266
2267        // WARN the worker is stuck
2268        stuckCount++;
2269        LOG.warn("Worker stuck {}, run time {}", worker,
2270          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
2271      }
2272      return stuckCount;
2273    }
2274
2275    private void checkThreadCount(final int stuckCount) {
2276      // nothing to do if there are no runnable tasks
2277      if (stuckCount < 1 || !scheduler.hasRunnables()) {
2278        return;
2279      }
2280
2281      // add a new thread if the worker stuck percentage exceed the threshold limit
2282      // and every handler is active.
2283      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
2284      // let's add new worker thread more aggressively, as they will timeout finally if there is no
2285      // work to do.
2286      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
2287        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
2288        workerThreads.add(worker);
2289        worker.start();
2290        LOG.debug("Added new worker thread {}", worker);
2291      }
2292    }
2293
2294    private void refreshConfig() {
2295      addWorkerStuckPercentage =
2296        conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
2297      timeoutInterval =
2298        conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL);
2299      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, DEFAULT_WORKER_STUCK_THRESHOLD);
2300    }
2301
2302    @Override
2303    public int getTimeoutInterval() {
2304      return timeoutInterval;
2305    }
2306  }
2307}