001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.IOException;
022import java.io.UncheckedIOException;
023import java.util.ArrayDeque;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Comparator;
028import java.util.Deque;
029import java.util.HashSet;
030import java.util.List;
031import java.util.PriorityQueue;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.Executor;
036import java.util.concurrent.Executors;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicLong;
041import java.util.stream.Collectors;
042import java.util.stream.Stream;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
046import org.apache.hadoop.hbase.log.HBaseMarkers;
047import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
049import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
050import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
051import org.apache.hadoop.hbase.procedure2.trace.ProcedureSpanBuilder;
052import org.apache.hadoop.hbase.procedure2.util.StringUtils;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.trace.TraceUtil;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.IdLock;
057import org.apache.hadoop.hbase.util.NonceKey;
058import org.apache.hadoop.hbase.util.Threads;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
065
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
067
068/**
069 * Thread Pool that executes the submitted procedures. The executor has a ProcedureStore associated.
070 * Each operation is logged and on restart the pending procedures are resumed. Unless the Procedure
071 * code throws an error (e.g. invalid user input) the procedure will complete (at some point in
072 * time), On restart the pending procedures are resumed and the once failed will be rolledback. The
073 * user can add procedures to the executor via submitProcedure(proc) check for the finished state
074 * via isFinished(procId) and get the result via getResult(procId)
075 */
076@InterfaceAudience.Private
077public class ProcedureExecutor<TEnvironment> {
078  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
079
080  public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
081  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
082
083  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
084    "hbase.procedure.worker.keep.alive.time.msec";
085  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
086
087  public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
088  static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
089
090  public static final String EVICT_ACKED_TTL_CONF_KEY = "hbase.procedure.cleaner.acked.evict.ttl";
091  static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
092
093  /**
094   * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to break PE
095   * having it fail at various junctures. When non-null, testing is set to an instance of the below
096   * internal {@link Testing} class with flags set for the particular test.
097   */
098  volatile Testing testing = null;
099
100  /**
101   * Class with parameters describing how to fail/die when in testing-context.
102   */
103  public static class Testing {
104    protected volatile boolean killIfHasParent = true;
105    protected volatile boolean killIfSuspended = false;
106
107    /**
108     * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
109     * persisting all the state it needs to recover after a crash.
110     */
111    protected volatile boolean killBeforeStoreUpdate = false;
112    protected volatile boolean toggleKillBeforeStoreUpdate = false;
113
114    /**
115     * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
116     * is about a case where memory-state was being set after store to WAL where a crash could cause
117     * us to get stuck. This flag allows killing at what was a vulnerable time.
118     */
119    protected volatile boolean killAfterStoreUpdate = false;
120    protected volatile boolean toggleKillAfterStoreUpdate = false;
121
122    protected volatile boolean killBeforeStoreUpdateInRollback = false;
123    protected volatile boolean toggleKillBeforeStoreUpdateInRollback = false;
124
125    protected boolean shouldKillBeforeStoreUpdate() {
126      final boolean kill = this.killBeforeStoreUpdate;
127      if (this.toggleKillBeforeStoreUpdate) {
128        this.killBeforeStoreUpdate = !kill;
129        LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
130      }
131      return kill;
132    }
133
134    protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
135      if (isSuspended && !killIfSuspended) {
136        return false;
137      }
138      if (hasParent && !killIfHasParent) {
139        return false;
140      }
141      return shouldKillBeforeStoreUpdate();
142    }
143
144    protected boolean shouldKillAfterStoreUpdate() {
145      final boolean kill = this.killAfterStoreUpdate;
146      if (this.toggleKillAfterStoreUpdate) {
147        this.killAfterStoreUpdate = !kill;
148        LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
149      }
150      return kill;
151    }
152
153    protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
154      return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
155    }
156
157    protected boolean shouldKillBeforeStoreUpdateInRollback() {
158      final boolean kill = this.killBeforeStoreUpdateInRollback;
159      if (this.toggleKillBeforeStoreUpdateInRollback) {
160        this.killBeforeStoreUpdateInRollback = !kill;
161        LOG.warn("Toggle KILL before store update in rollback to: "
162          + this.killBeforeStoreUpdateInRollback);
163      }
164      return kill;
165    }
166  }
167
168  public interface ProcedureExecutorListener {
169    void procedureLoaded(long procId);
170
171    void procedureAdded(long procId);
172
173    void procedureFinished(long procId);
174  }
175
176  /**
177   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. Once a
178   * Root-Procedure completes (success or failure), the result will be added to this map. The user
179   * of ProcedureExecutor should call getResult(procId) to get the result.
180   */
181  private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
182    new ConcurrentHashMap<>();
183
184  /**
185   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
186   * The RootProcedureState contains the execution stack of the Root-Procedure, It is added to the
187   * map by submitProcedure() and removed on procedure completion.
188   */
189  private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
190    new ConcurrentHashMap<>();
191
192  /**
193   * Helper map to lookup the live procedures by ID. This map contains every procedure.
194   * root-procedures and subprocedures.
195   */
196  private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
197    new ConcurrentHashMap<>();
198
199  /**
200   * Helper map to lookup whether the procedure already issued from the same client. This map
201   * contains every root procedure.
202   */
203  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
204
205  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
206    new CopyOnWriteArrayList<>();
207
208  private Configuration conf;
209
210  /**
211   * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
212   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
213   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
214   */
215  private ThreadGroup threadGroup;
216
217  /**
218   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
219   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
220   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
221   */
222  private CopyOnWriteArrayList<WorkerThread> workerThreads;
223
224  /**
225   * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
226   * resource handling rather than observing in a #join is unexpected). Overridden when we do the
227   * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).
228   */
229  private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
230
231  /**
232   * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
233   * there is no worker to assign meta, it will new worker thread for it, so it is very important.
234   * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore and
235   * so on, some tasks may execute for a long time so will block other tasks like WorkerMonitor, so
236   * use a dedicated thread for executing WorkerMonitor.
237   */
238  private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;
239
240  private int corePoolSize;
241  private int maxPoolSize;
242
243  private volatile long keepAliveTime;
244
245  /**
246   * Scheduler/Queue that contains runnable procedures.
247   */
248  private final ProcedureScheduler scheduler;
249
250  private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
251    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
252
253  private final AtomicLong lastProcId = new AtomicLong(-1);
254  private final AtomicLong workerId = new AtomicLong(0);
255  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
256  private final AtomicBoolean running = new AtomicBoolean(false);
257  private final TEnvironment environment;
258  private final ProcedureStore store;
259
260  private final boolean checkOwnerSet;
261
262  // To prevent concurrent execution of the same procedure.
263  // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
264  // procedure is woken up before we finish the suspend which causes the same procedures to be
265  // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
266  // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
267  // execution of the same procedure.
268  private final IdLock procExecutionLock = new IdLock();
269
270  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
271    final ProcedureStore store) {
272    this(conf, environment, store, new SimpleProcedureScheduler());
273  }
274
275  private boolean isRootFinished(Procedure<?> proc) {
276    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
277    return rootProc == null || rootProc.isFinished();
278  }
279
280  private void forceUpdateProcedure(long procId) throws IOException {
281    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
282    try {
283      Procedure<TEnvironment> proc = procedures.get(procId);
284      if (proc != null) {
285        if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
286          LOG.debug("Procedure {} has already been finished and parent is succeeded,"
287            + " skip force updating", proc);
288          return;
289        }
290      } else {
291        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
292        if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
293          LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
294          return;
295        }
296        long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
297        long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
298        if (retainer.isExpired(EnvironmentEdgeManager.currentTime(), evictTtl, evictAckTtl)) {
299          LOG.debug("Procedure {} has already been finished and expired, skip force updating",
300            procId);
301          return;
302        }
303        proc = retainer.getProcedure();
304      }
305      LOG.debug("Force update procedure {}", proc);
306      store.update(proc);
307    } finally {
308      procExecutionLock.releaseLockEntry(lockEntry);
309    }
310  }
311
312  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
313    final ProcedureStore store, final ProcedureScheduler scheduler) {
314    this.environment = environment;
315    this.scheduler = scheduler;
316    this.store = store;
317    this.conf = conf;
318    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
319    refreshConfiguration(conf);
320    store.registerListener(new ProcedureStoreListener() {
321
322      @Override
323      public void forceUpdate(long[] procIds) {
324        Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
325          try {
326            forceUpdateProcedure(procId);
327          } catch (IOException e) {
328            LOG.warn("Failed to force update procedure with pid={}", procId);
329          }
330        }));
331      }
332    });
333  }
334
335  private void load(final boolean abortOnCorruption) throws IOException {
336    Preconditions.checkArgument(completed.isEmpty(), "completed not empty: %s", completed);
337    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty: %s",
338      rollbackStack);
339    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty: %s", procedures);
340    Preconditions.checkArgument(scheduler.size() == 0, "scheduler queue not empty: %s", scheduler);
341
342    store.load(new ProcedureStore.ProcedureLoader() {
343      @Override
344      public void setMaxProcId(long maxProcId) {
345        assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
346        lastProcId.set(maxProcId);
347      }
348
349      @Override
350      public void load(ProcedureIterator procIter) throws IOException {
351        loadProcedures(procIter);
352      }
353
354      @Override
355      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
356        int corruptedCount = 0;
357        while (procIter.hasNext()) {
358          Procedure<?> proc = procIter.next();
359          LOG.error("Corrupt " + proc);
360          corruptedCount++;
361        }
362        if (abortOnCorruption && corruptedCount > 0) {
363          throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
364        }
365      }
366    });
367  }
368
369  private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
370    proc.restoreLock(getEnvironment());
371    restored.add(proc.getProcId());
372  }
373
374  private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
375    while (!stack.isEmpty()) {
376      restoreLock(stack.pop(), restored);
377    }
378  }
379
380  // Restore the locks for all the procedures.
381  // Notice that we need to restore the locks starting from the root proc, otherwise there will be
382  // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
383  // calling the acquireLock method for the parent procedure.
384  // The algorithm is straight-forward:
385  // 1. Use a set to record the procedures which locks have already been restored.
386  // 2. Use a stack to store the hierarchy of the procedures
387  // 3. For all the procedure, we will first try to find its parent and push it into the stack,
388  // unless
389  // a. We have no parent, i.e, we are the root procedure
390  // b. The lock has already been restored(by checking the set introduced in #1)
391  // then we start to pop the stack and call acquireLock for each procedure.
392  // Notice that this should be done for all procedures, not only the ones in runnableList.
393  private void restoreLocks() {
394    Set<Long> restored = new HashSet<>();
395    Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
396    procedures.values().forEach(proc -> {
397      for (;;) {
398        if (restored.contains(proc.getProcId())) {
399          restoreLocks(stack, restored);
400          return;
401        }
402        if (!proc.hasParent()) {
403          restoreLock(proc, restored);
404          restoreLocks(stack, restored);
405          return;
406        }
407        stack.push(proc);
408        proc = procedures.get(proc.getParentProcId());
409      }
410    });
411  }
412
413  private void initializeStacks(ProcedureIterator procIter,
414    List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList,
415    List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> waitingTimeoutList)
416    throws IOException {
417    procIter.reset();
418    while (procIter.hasNext()) {
419      if (procIter.isNextFinished()) {
420        procIter.skipNext();
421        continue;
422      }
423
424      @SuppressWarnings("unchecked")
425      Procedure<TEnvironment> proc = procIter.next();
426      assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
427      LOG.debug("Loading {}", proc);
428      Long rootProcId = getRootProcedureId(proc);
429      // The orphan procedures will be passed to handleCorrupted, so add an assert here
430      assert rootProcId != null;
431
432      if (proc.hasParent()) {
433        Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
434        if (parent != null && !proc.isFinished()) {
435          parent.incChildrenLatch();
436        }
437      }
438
439      RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
440      procStack.loadStack(proc);
441
442      proc.setRootProcId(rootProcId);
443      switch (proc.getState()) {
444        case RUNNABLE:
445          runnableList.add(proc);
446          break;
447        case WAITING:
448          waitingList.add(proc);
449          break;
450        case WAITING_TIMEOUT:
451          waitingTimeoutList.add(proc);
452          break;
453        case FAILED:
454          failedList.add(proc);
455          break;
456        case ROLLEDBACK:
457        case INITIALIZING:
458          String msg = "Unexpected " + proc.getState() + " state for " + proc;
459          LOG.error(msg);
460          throw new UnsupportedOperationException(msg);
461        default:
462          break;
463      }
464    }
465    rollbackStack.forEach((rootProcId, procStack) -> {
466      if (procStack.getSubproceduresStack() != null) {
467        // if we have already record some stack ids, it means we support rollback
468        procStack.setRollbackSupported(true);
469      } else {
470        // otherwise, test the root procedure to see if we support rollback
471        procStack.setRollbackSupported(procedures.get(rootProcId).isRollbackSupported());
472      }
473    });
474  }
475
476  private void processWaitingProcedures(List<Procedure<TEnvironment>> waitingList,
477    List<Procedure<TEnvironment>> runnableList) {
478    waitingList.forEach(proc -> {
479      if (!proc.hasChildren()) {
480        // Normally, WAITING procedures should be waken by its children. But, there is a case that,
481        // all the children are successful and before they can wake up their parent procedure, the
482        // master was killed. So, during recovering the procedures from ProcedureWal, its children
483        // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
484        // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
485        // exception will throw:
486        // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
487        // "NOT RUNNABLE! " + procedure.toString());
488        proc.setState(ProcedureState.RUNNABLE);
489        runnableList.add(proc);
490      } else {
491        proc.afterReplay(getEnvironment());
492      }
493    });
494  }
495
496  private void processWaitingTimeoutProcedures(List<Procedure<TEnvironment>> waitingTimeoutList) {
497    waitingTimeoutList.forEach(proc -> {
498      proc.afterReplay(getEnvironment());
499      timeoutExecutor.add(proc);
500    });
501  }
502
503  private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList,
504    List<Procedure<TEnvironment>> failedList) {
505    failedList.forEach(scheduler::addBack);
506    runnableList.forEach(p -> {
507      p.afterReplay(getEnvironment());
508      if (!p.hasParent()) {
509        sendProcedureLoadedNotification(p.getProcId());
510      }
511      scheduler.addBack(p);
512    });
513  }
514
515  private void loadProcedures(ProcedureIterator procIter) throws IOException {
516    // 1. Build the rollback stack
517    int runnableCount = 0;
518    int failedCount = 0;
519    int waitingCount = 0;
520    int waitingTimeoutCount = 0;
521    while (procIter.hasNext()) {
522      boolean finished = procIter.isNextFinished();
523      @SuppressWarnings("unchecked")
524      Procedure<TEnvironment> proc = procIter.next();
525      NonceKey nonceKey = proc.getNonceKey();
526      long procId = proc.getProcId();
527
528      if (finished) {
529        completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
530        LOG.debug("Completed {}", proc);
531      } else {
532        if (!proc.hasParent()) {
533          assert !proc.isFinished() : "unexpected finished procedure";
534          rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
535        }
536
537        // add the procedure to the map
538        proc.beforeReplay(getEnvironment());
539        procedures.put(proc.getProcId(), proc);
540        switch (proc.getState()) {
541          case RUNNABLE:
542            runnableCount++;
543            break;
544          case FAILED:
545            failedCount++;
546            break;
547          case WAITING:
548            waitingCount++;
549            break;
550          case WAITING_TIMEOUT:
551            waitingTimeoutCount++;
552            break;
553          default:
554            break;
555        }
556      }
557
558      if (nonceKey != null) {
559        nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
560      }
561    }
562
563    // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
564    // push it into the ProcedureScheduler directly to execute the rollback. But this does not work
565    // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
566    // the queue from runQueue in scheduler, and then when a procedure which has lock access, for
567    // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
568    // we will add the queue back to let the workers poll from it. The assumption here is that, the
569    // procedure which has the xlock should have been polled out already, so when loading we can not
570    // add the procedure to scheduler first and then call acquireLock, since the procedure is still
571    // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
572    // then there is a dead lock
573    List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
574    List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
575    List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
576    List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
577
578    initializeStacks(procIter, runnableList, failedList, waitingList, waitingTimeoutList);
579
580    // 3. Check the waiting procedures to see if some of them can be added to runnable.
581    processWaitingProcedures(waitingList, runnableList);
582
583    // 4. restore locks
584    restoreLocks();
585
586    // 5. Push the procedures to the timeout executor
587    processWaitingTimeoutProcedures(waitingTimeoutList);
588
589    // 6. Push the procedure to the scheduler
590    pushProceduresAfterLoad(runnableList, failedList);
591    // After all procedures put into the queue, signal the worker threads.
592    // Otherwise, there is a race condition. See HBASE-21364.
593    scheduler.signalAll();
594  }
595
596  /**
597   * Initialize the procedure executor, but do not start workers. We will start them later.
598   * <p/>
599   * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
600   * ensure a single executor, and start the procedure replay to resume and recover the previous
601   * pending and in-progress procedures.
602   * @param numThreads        number of threads available for procedure execution.
603   * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
604   *                          is found on replay. otherwise false.
605   */
606  public void init(int numThreads, boolean abortOnCorruption) throws IOException {
607    // We have numThreads executor + one timer thread used for timing out
608    // procedures and triggering periodic procedures.
609    this.corePoolSize = numThreads;
610    this.maxPoolSize = 10 * numThreads;
611    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
612      corePoolSize, maxPoolSize);
613
614    this.threadGroup = new ThreadGroup("PEWorkerGroup");
615    this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
616    this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
617
618    // Create the workers
619    workerId.set(0);
620    workerThreads = new CopyOnWriteArrayList<>();
621    for (int i = 0; i < corePoolSize; ++i) {
622      workerThreads.add(new WorkerThread(threadGroup));
623    }
624
625    long st, et;
626
627    // Acquire the store lease.
628    st = System.nanoTime();
629    store.recoverLease();
630    et = System.nanoTime();
631    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
632      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
633
634    // start the procedure scheduler
635    scheduler.start();
636
637    // TODO: Split in two steps.
638    // TODO: Handle corrupted procedures (currently just a warn)
639    // The first one will make sure that we have the latest id,
640    // so we can start the threads and accept new procedures.
641    // The second step will do the actual load of old procedures.
642    st = System.nanoTime();
643    load(abortOnCorruption);
644    et = System.nanoTime();
645    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
646      StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
647  }
648
649  /**
650   * Start the workers.
651   */
652  public void startWorkers() throws IOException {
653    if (!running.compareAndSet(false, true)) {
654      LOG.warn("Already running");
655      return;
656    }
657    // Start the executors. Here we must have the lastProcId set.
658    LOG.trace("Start workers {}", workerThreads.size());
659    timeoutExecutor.start();
660    workerMonitorExecutor.start();
661    for (WorkerThread worker : workerThreads) {
662      worker.start();
663    }
664
665    // Internal chores
666    workerMonitorExecutor.add(new WorkerMonitor());
667
668    // Add completed cleaner chore
669    addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
670      nonceKeysToProcIdsMap));
671  }
672
673  public void stop() {
674    if (!running.getAndSet(false)) {
675      return;
676    }
677
678    LOG.info("Stopping");
679    scheduler.stop();
680    timeoutExecutor.sendStopSignal();
681    workerMonitorExecutor.sendStopSignal();
682  }
683
684  public void join() {
685    assert !isRunning() : "expected not running";
686
687    // stop the timeout executor
688    timeoutExecutor.awaitTermination();
689    // stop the work monitor executor
690    workerMonitorExecutor.awaitTermination();
691
692    // stop the worker threads
693    for (WorkerThread worker : workerThreads) {
694      worker.awaitTermination();
695    }
696
697    // Destroy the Thread Group for the executors
698    // TODO: Fix. #join is not place to destroy resources.
699    try {
700      threadGroup.destroy();
701    } catch (IllegalThreadStateException e) {
702      LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup,
703        e.getMessage());
704      // This dumps list of threads on STDOUT.
705      this.threadGroup.list();
706    }
707
708    // reset the in-memory state for testing
709    completed.clear();
710    rollbackStack.clear();
711    procedures.clear();
712    nonceKeysToProcIdsMap.clear();
713    scheduler.clear();
714    lastProcId.set(-1);
715  }
716
717  public void refreshConfiguration(final Configuration conf) {
718    this.conf = conf;
719    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, DEFAULT_WORKER_KEEP_ALIVE_TIME),
720      TimeUnit.MILLISECONDS);
721  }
722
723  // ==========================================================================
724  // Accessors
725  // ==========================================================================
726  public boolean isRunning() {
727    return running.get();
728  }
729
730  /** Returns the current number of worker threads. */
731  public int getWorkerThreadCount() {
732    return workerThreads.size();
733  }
734
735  /** Returns the core pool size settings. */
736  public int getCorePoolSize() {
737    return corePoolSize;
738  }
739
740  public int getActiveExecutorCount() {
741    return activeExecutorCount.get();
742  }
743
744  public TEnvironment getEnvironment() {
745    return this.environment;
746  }
747
748  public ProcedureStore getStore() {
749    return this.store;
750  }
751
752  ProcedureScheduler getScheduler() {
753    return scheduler;
754  }
755
756  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
757    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
758    this.scheduler.signalAll();
759  }
760
761  public long getKeepAliveTime(final TimeUnit timeUnit) {
762    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
763  }
764
765  // ==========================================================================
766  // Submit/Remove Chores
767  // ==========================================================================
768
769  /**
770   * Add a chore procedure to the executor
771   * @param chore the chore to add
772   */
773  public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
774    if (chore == null) {
775      return;
776    }
777    chore.setState(ProcedureState.WAITING_TIMEOUT);
778    timeoutExecutor.add(chore);
779  }
780
781  /**
782   * Remove a chore procedure from the executor
783   * @param chore the chore to remove
784   * @return whether the chore is removed, or it will be removed later
785   */
786  public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
787    if (chore == null) {
788      return true;
789    }
790    chore.setState(ProcedureState.SUCCESS);
791    return timeoutExecutor.remove(chore);
792  }
793
794  // ==========================================================================
795  // Nonce Procedure helpers
796  // ==========================================================================
797  /**
798   * Create a NonceKey from the specified nonceGroup and nonce.
799   * @param nonceGroup the group to use for the {@link NonceKey}
800   * @param nonce      the nonce to use in the {@link NonceKey}
801   * @return the generated NonceKey
802   */
803  public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
804    return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
805  }
806
807  /**
808   * Register a nonce for a procedure that is going to be submitted. A procId will be reserved and
809   * on submitProcedure(), the procedure with the specified nonce will take the reserved ProcId. If
810   * someone already reserved the nonce, this method will return the procId reserved, otherwise an
811   * invalid procId will be returned. and the caller should procede and submit the procedure.
812   * @param nonceKey A unique identifier for this operation from the client or process.
813   * @return the procId associated with the nonce, if any otherwise an invalid procId.
814   */
815  public long registerNonce(final NonceKey nonceKey) {
816    if (nonceKey == null) {
817      return -1;
818    }
819
820    // check if we have already a Reserved ID for the nonce
821    Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
822    if (oldProcId == null) {
823      // reserve a new Procedure ID, this will be associated with the nonce
824      // and the procedure submitted with the specified nonce will use this ID.
825      final long newProcId = nextProcId();
826      oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
827      if (oldProcId == null) {
828        return -1;
829      }
830    }
831
832    // we found a registered nonce, but the procedure may not have been submitted yet.
833    // since the client expect the procedure to be submitted, spin here until it is.
834    final boolean traceEnabled = LOG.isTraceEnabled();
835    while (
836      isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId))
837        && nonceKeysToProcIdsMap.containsKey(nonceKey)
838    ) {
839      if (traceEnabled) {
840        LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
841      }
842      Threads.sleep(100);
843    }
844    return oldProcId.longValue();
845  }
846
847  /**
848   * Remove the NonceKey if the procedure was not submitted to the executor.
849   * @param nonceKey A unique identifier for this operation from the client or process.
850   */
851  public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
852    if (nonceKey == null) {
853      return;
854    }
855
856    final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
857    if (procId == null) {
858      return;
859    }
860
861    // if the procedure was not submitted, remove the nonce
862    if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
863      nonceKeysToProcIdsMap.remove(nonceKey);
864    }
865  }
866
867  /**
868   * If the failure failed before submitting it, we may want to give back the same error to the
869   * requests with the same nonceKey.
870   * @param nonceKey  A unique identifier for this operation from the client or process
871   * @param procName  name of the procedure, used to inform the user
872   * @param procOwner name of the owner of the procedure, used to inform the user
873   * @param exception the failure to report to the user
874   */
875  public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
876    IOException exception) {
877    if (nonceKey == null) {
878      return;
879    }
880
881    Long procId = nonceKeysToProcIdsMap.get(nonceKey);
882    if (procId == null || completed.containsKey(procId)) {
883      return;
884    }
885
886    completed.computeIfAbsent(procId, (key) -> {
887      Procedure<TEnvironment> proc =
888        new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
889
890      return new CompletedProcedureRetainer<>(proc);
891    });
892  }
893
894  // ==========================================================================
895  // Submit/Abort Procedure
896  // ==========================================================================
897  /**
898   * Add a new root-procedure to the executor.
899   * @param proc the new procedure to execute.
900   * @return the procedure id, that can be used to monitor the operation
901   */
902  public long submitProcedure(Procedure<TEnvironment> proc) {
903    return submitProcedure(proc, null);
904  }
905
906  /**
907   * Bypass a procedure. If the procedure is set to bypass, all the logic in execute/rollback will
908   * be ignored and it will return success, whatever. It is used to recover buggy stuck procedures,
909   * releasing the lock resources and letting other procedures run. Bypassing one procedure (and its
910   * ancestors will be bypassed automatically) may leave the cluster in a middle state, e.g. region
911   * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, the
912   * operators may have to do some clean up on hdfs or schedule some assign procedures to let region
913   * online. DO AT YOUR OWN RISK.
914   * <p>
915   * A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING,
916   * WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is
917   * executing it 3. No child procedure has been submitted
918   * <p>
919   * If all the requirements are meet, the procedure and its ancestors will be bypassed and
920   * persisted to WAL.
921   * <p>
922   * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What
923   * about WAITING_TIMEOUT?
924   * @param pids      the procedure id
925   * @param lockWait  time to wait lock
926   * @param force     if force set to true, we will bypass the procedure even if it is executing.
927   *                  This is for procedures which can't break out during executing(due to bug,
928   *                  mostly) In this case, bypassing the procedure is not enough, since it is
929   *                  already stuck there. We need to restart the master after bypassing, and
930   *                  letting the problematic procedure to execute wth bypass=true, so in that
931   *                  condition, the procedure can be successfully bypassed.
932   * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
933   * @return true if bypass success
934   * @throws IOException IOException
935   */
936  public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
937    boolean recursive) throws IOException {
938    List<Boolean> result = new ArrayList<Boolean>(pids.size());
939    for (long pid : pids) {
940      result.add(bypassProcedure(pid, lockWait, force, recursive));
941    }
942    return result;
943  }
944
945  boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
946    throws IOException {
947    Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
948    final Procedure<TEnvironment> procedure = getProcedure(pid);
949    if (procedure == null) {
950      LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
951      return false;
952    }
953
954    LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", procedure, lockWait,
955      override, recursive);
956    IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
957    if (lockEntry == null && !override) {
958      LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", lockWait,
959        procedure, override);
960      return false;
961    } else if (lockEntry == null) {
962      LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", lockWait,
963        procedure, override);
964    }
965    try {
966      // check whether the procedure is already finished
967      if (procedure.isFinished()) {
968        LOG.debug("{} is already finished, skipping bypass", procedure);
969        return false;
970      }
971
972      if (procedure.hasChildren()) {
973        if (recursive) {
974          // EXPENSIVE. Checks each live procedure of which there could be many!!!
975          // Is there another way to get children of a procedure?
976          LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
977          this.procedures.forEachValue(1 /* Single-threaded */,
978            // Transformer
979            v -> v.getParentProcId() == procedure.getProcId() ? v : null,
980            // Consumer
981            v -> {
982              try {
983                bypassProcedure(v.getProcId(), lockWait, override, recursive);
984              } catch (IOException e) {
985                LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
986              }
987            });
988        } else {
989          LOG.debug("{} has children, skipping bypass", procedure);
990          return false;
991        }
992      }
993
994      // If the procedure has no parent or no child, we are safe to bypass it in whatever state
995      if (
996        procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
997          && procedure.getState() != ProcedureState.WAITING
998          && procedure.getState() != ProcedureState.WAITING_TIMEOUT
999      ) {
1000        LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
1001          + "(with no parent), {}", procedure);
1002        // Question: how is the bypass done here?
1003        return false;
1004      }
1005
1006      // Now, the procedure is not finished, and no one can execute it since we take the lock now
1007      // And we can be sure that its ancestor is not running too, since their child has not
1008      // finished yet
1009      Procedure<TEnvironment> current = procedure;
1010      while (current != null) {
1011        LOG.debug("Bypassing {}", current);
1012        current.bypass(getEnvironment());
1013        store.update(current);
1014        long parentID = current.getParentProcId();
1015        current = getProcedure(parentID);
1016      }
1017
1018      // wake up waiting procedure, already checked there is no child
1019      if (procedure.getState() == ProcedureState.WAITING) {
1020        procedure.setState(ProcedureState.RUNNABLE);
1021        store.update(procedure);
1022      }
1023
1024      // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
1025      // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
1026      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1027        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
1028        if (timeoutExecutor.remove(procedure)) {
1029          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
1030          timeoutExecutor.executeTimedoutProcedure(procedure);
1031        }
1032      } else if (lockEntry != null) {
1033        scheduler.addFront(procedure);
1034        LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
1035      } else {
1036        // If we don't have the lock, we can't re-submit the queue,
1037        // since it is already executing. To get rid of the stuck situation, we
1038        // need to restart the master. With the procedure set to bypass, the procedureExecutor
1039        // will bypass it and won't get stuck again.
1040        LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
1041          + "skipping add to queue", procedure);
1042      }
1043      return true;
1044
1045    } finally {
1046      if (lockEntry != null) {
1047        procExecutionLock.releaseLockEntry(lockEntry);
1048      }
1049    }
1050  }
1051
1052  /**
1053   * Add a new root-procedure to the executor.
1054   * @param proc     the new procedure to execute.
1055   * @param nonceKey the registered unique identifier for this operation from the client or process.
1056   * @return the procedure id, that can be used to monitor the operation
1057   */
1058  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
1059      justification = "FindBugs is blind to the check-for-null")
1060  public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
1061    Preconditions.checkArgument(lastProcId.get() >= 0);
1062
1063    prepareProcedure(proc);
1064
1065    final Long currentProcId;
1066    if (nonceKey != null) {
1067      currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
1068      Preconditions.checkArgument(currentProcId != null,
1069        "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
1070    } else {
1071      currentProcId = nextProcId();
1072    }
1073
1074    // Initialize the procedure
1075    proc.setNonceKey(nonceKey);
1076    proc.setProcId(currentProcId.longValue());
1077
1078    // Commit the transaction
1079    store.insert(proc, null);
1080    LOG.debug("Stored {}", proc);
1081
1082    // Add the procedure to the executor
1083    return pushProcedure(proc);
1084  }
1085
1086  /**
1087   * Add a set of new root-procedure to the executor.
1088   * @param procs the new procedures to execute.
1089   */
1090  // TODO: Do we need to take nonces here?
1091  public void submitProcedures(Procedure<TEnvironment>[] procs) {
1092    Preconditions.checkArgument(lastProcId.get() >= 0);
1093    if (procs == null || procs.length <= 0) {
1094      return;
1095    }
1096
1097    // Prepare procedure
1098    for (int i = 0; i < procs.length; ++i) {
1099      prepareProcedure(procs[i]).setProcId(nextProcId());
1100    }
1101
1102    // Commit the transaction
1103    store.insert(procs);
1104    if (LOG.isDebugEnabled()) {
1105      LOG.debug("Stored " + Arrays.toString(procs));
1106    }
1107
1108    // Add the procedure to the executor
1109    for (int i = 0; i < procs.length; ++i) {
1110      pushProcedure(procs[i]);
1111    }
1112  }
1113
1114  private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
1115    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
1116    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
1117    if (this.checkOwnerSet) {
1118      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
1119    }
1120    return proc;
1121  }
1122
1123  private long pushProcedure(Procedure<TEnvironment> proc) {
1124    final long currentProcId = proc.getProcId();
1125
1126    // Update metrics on start of a procedure
1127    proc.updateMetricsOnSubmit(getEnvironment());
1128
1129    // Create the rollback stack for the procedure
1130    RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
1131    stack.setRollbackSupported(proc.isRollbackSupported());
1132    rollbackStack.put(currentProcId, stack);
1133
1134    // Submit the new subprocedures
1135    assert !procedures.containsKey(currentProcId);
1136    procedures.put(currentProcId, proc);
1137    sendProcedureAddedNotification(currentProcId);
1138    scheduler.addBack(proc);
1139    return proc.getProcId();
1140  }
1141
1142  /**
1143   * Send an abort notification the specified procedure. Depending on the procedure implementation
1144   * the abort can be considered or ignored.
1145   * @param procId the procedure to abort
1146   * @return true if the procedure exists and has received the abort, otherwise false.
1147   */
1148  public boolean abort(long procId) {
1149    return abort(procId, true);
1150  }
1151
1152  /**
1153   * Send an abort notification to the specified procedure. Depending on the procedure
1154   * implementation, the abort can be considered or ignored.
1155   * @param procId                the procedure to abort
1156   * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
1157   * @return true if the procedure exists and has received the abort, otherwise false.
1158   */
1159  public boolean abort(long procId, boolean mayInterruptIfRunning) {
1160    Procedure<TEnvironment> proc = procedures.get(procId);
1161    if (proc != null) {
1162      if (!mayInterruptIfRunning && proc.wasExecuted()) {
1163        return false;
1164      }
1165      return proc.abort(getEnvironment());
1166    }
1167    return false;
1168  }
1169
1170  // ==========================================================================
1171  // Executor query helpers
1172  // ==========================================================================
1173  public Procedure<TEnvironment> getProcedure(final long procId) {
1174    return procedures.get(procId);
1175  }
1176
1177  public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
1178    Procedure<TEnvironment> proc = getProcedure(procId);
1179    if (clazz.isInstance(proc)) {
1180      return clazz.cast(proc);
1181    }
1182    return null;
1183  }
1184
1185  public Procedure<TEnvironment> getResult(long procId) {
1186    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1187    if (retainer == null) {
1188      return null;
1189    } else {
1190      return retainer.getProcedure();
1191    }
1192  }
1193
1194  /**
1195   * Return true if the procedure is finished. The state may be "completed successfully" or "failed
1196   * and rolledback". Use getResult() to check the state or get the result data.
1197   * @param procId the ID of the procedure to check
1198   * @return true if the procedure execution is finished, otherwise false.
1199   */
1200  public boolean isFinished(final long procId) {
1201    return !procedures.containsKey(procId);
1202  }
1203
1204  /**
1205   * Return true if the procedure is started.
1206   * @param procId the ID of the procedure to check
1207   * @return true if the procedure execution is started, otherwise false.
1208   */
1209  public boolean isStarted(long procId) {
1210    Procedure<?> proc = procedures.get(procId);
1211    if (proc == null) {
1212      return completed.get(procId) != null;
1213    }
1214    return proc.wasExecuted();
1215  }
1216
1217  /**
1218   * Mark the specified completed procedure, as ready to remove.
1219   * @param procId the ID of the procedure to remove
1220   */
1221  public void removeResult(long procId) {
1222    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1223    if (retainer == null) {
1224      assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
1225      LOG.debug("pid={} already removed by the cleaner.", procId);
1226      return;
1227    }
1228
1229    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
1230    retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
1231  }
1232
1233  public Procedure<TEnvironment> getResultOrProcedure(long procId) {
1234    CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1235    if (retainer == null) {
1236      return procedures.get(procId);
1237    } else {
1238      return retainer.getProcedure();
1239    }
1240  }
1241
1242  /**
1243   * Check if the user is this procedure's owner
1244   * @param procId the target procedure
1245   * @param user   the user
1246   * @return true if the user is the owner of the procedure, false otherwise or the owner is
1247   *         unknown.
1248   */
1249  public boolean isProcedureOwner(long procId, User user) {
1250    if (user == null) {
1251      return false;
1252    }
1253    final Procedure<TEnvironment> runningProc = procedures.get(procId);
1254    if (runningProc != null) {
1255      return runningProc.getOwner().equals(user.getShortName());
1256    }
1257
1258    final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
1259    if (retainer != null) {
1260      return retainer.getProcedure().getOwner().equals(user.getShortName());
1261    }
1262
1263    // Procedure either does not exist or has already completed and got cleaned up.
1264    // At this time, we cannot check the owner of the procedure
1265    return false;
1266  }
1267
1268  /**
1269   * Should only be used when starting up, where the procedure workers have not been started.
1270   * <p/>
1271   * If the procedure works has been started, the return values maybe changed when you are
1272   * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
1273   * it will do a copy, and also include the finished procedures.
1274   */
1275  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
1276    return procedures.values();
1277  }
1278
1279  /**
1280   * Get procedures.
1281   * @return the procedures in a list
1282   */
1283  public List<Procedure<TEnvironment>> getProcedures() {
1284    List<Procedure<TEnvironment>> procedureList =
1285      new ArrayList<>(procedures.size() + completed.size());
1286    procedureList.addAll(procedures.values());
1287    // Note: The procedure could show up twice in the list with different state, as
1288    // it could complete after we walk through procedures list and insert into
1289    // procedureList - it is ok, as we will use the information in the Procedure
1290    // to figure it out; to prevent this would increase the complexity of the logic.
1291    completed.values().stream().map(CompletedProcedureRetainer::getProcedure)
1292      .forEach(procedureList::add);
1293    return procedureList;
1294  }
1295
1296  // ==========================================================================
1297  // Listeners helpers
1298  // ==========================================================================
1299  public void registerListener(ProcedureExecutorListener listener) {
1300    this.listeners.add(listener);
1301  }
1302
1303  public boolean unregisterListener(ProcedureExecutorListener listener) {
1304    return this.listeners.remove(listener);
1305  }
1306
1307  private void sendProcedureLoadedNotification(final long procId) {
1308    if (!this.listeners.isEmpty()) {
1309      for (ProcedureExecutorListener listener : this.listeners) {
1310        try {
1311          listener.procedureLoaded(procId);
1312        } catch (Throwable e) {
1313          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1314        }
1315      }
1316    }
1317  }
1318
1319  private void sendProcedureAddedNotification(final long procId) {
1320    if (!this.listeners.isEmpty()) {
1321      for (ProcedureExecutorListener listener : this.listeners) {
1322        try {
1323          listener.procedureAdded(procId);
1324        } catch (Throwable e) {
1325          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1326        }
1327      }
1328    }
1329  }
1330
1331  private void sendProcedureFinishedNotification(final long procId) {
1332    if (!this.listeners.isEmpty()) {
1333      for (ProcedureExecutorListener listener : this.listeners) {
1334        try {
1335          listener.procedureFinished(procId);
1336        } catch (Throwable e) {
1337          LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
1338        }
1339      }
1340    }
1341  }
1342
1343  // ==========================================================================
1344  // Procedure IDs helpers
1345  // ==========================================================================
1346  private long nextProcId() {
1347    long procId = lastProcId.incrementAndGet();
1348    if (procId < 0) {
1349      while (!lastProcId.compareAndSet(procId, 0)) {
1350        procId = lastProcId.get();
1351        if (procId >= 0) {
1352          break;
1353        }
1354      }
1355      while (procedures.containsKey(procId)) {
1356        procId = lastProcId.incrementAndGet();
1357      }
1358    }
1359    assert procId >= 0 : "Invalid procId " + procId;
1360    return procId;
1361  }
1362
1363  protected long getLastProcId() {
1364    return lastProcId.get();
1365  }
1366
1367  public Set<Long> getActiveProcIds() {
1368    return procedures.keySet();
1369  }
1370
1371  Long getRootProcedureId(Procedure<TEnvironment> proc) {
1372    return Procedure.getRootProcedureId(procedures, proc);
1373  }
1374
1375  // ==========================================================================
1376  // Executions
1377  // ==========================================================================
1378  private void executeProcedure(Procedure<TEnvironment> proc) {
1379    if (proc.isFinished()) {
1380      LOG.debug("{} is already finished, skipping execution", proc);
1381      return;
1382    }
1383    final Long rootProcId = getRootProcedureId(proc);
1384    if (rootProcId == null) {
1385      // The 'proc' was ready to run but the root procedure was rolledback
1386      LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
1387      executeRollback(proc);
1388      return;
1389    }
1390
1391    RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
1392    if (procStack == null) {
1393      LOG.warn("RootProcedureState is null for " + proc.getProcId());
1394      return;
1395    }
1396    do {
1397      // Try to acquire the execution
1398      if (!procStack.acquire(proc)) {
1399        if (procStack.setRollback()) {
1400          // we have the 'rollback-lock' we can start rollingback
1401          switch (executeRollback(rootProcId, procStack)) {
1402            case LOCK_ACQUIRED:
1403              break;
1404            case LOCK_YIELD_WAIT:
1405              procStack.unsetRollback();
1406              scheduler.yield(proc);
1407              break;
1408            case LOCK_EVENT_WAIT:
1409              LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
1410              procStack.unsetRollback();
1411              break;
1412            default:
1413              throw new UnsupportedOperationException();
1414          }
1415        } else {
1416          // if we can't rollback means that some child is still running.
1417          // the rollback will be executed after all the children are done.
1418          // If the procedure was never executed, remove and mark it as rolledback.
1419          if (!proc.wasExecuted()) {
1420            switch (executeRollback(proc)) {
1421              case LOCK_ACQUIRED:
1422                break;
1423              case LOCK_YIELD_WAIT:
1424                scheduler.yield(proc);
1425                break;
1426              case LOCK_EVENT_WAIT:
1427                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
1428                break;
1429              default:
1430                throw new UnsupportedOperationException();
1431            }
1432          }
1433        }
1434        break;
1435      }
1436
1437      // Execute the procedure
1438      assert proc.getState() == ProcedureState.RUNNABLE : proc;
1439      // Note that lock is NOT about concurrency but rather about ensuring
1440      // ownership of a procedure of an entity such as a region or table
1441      LockState lockState = acquireLock(proc);
1442      switch (lockState) {
1443        case LOCK_ACQUIRED:
1444          execProcedure(procStack, proc);
1445          break;
1446        case LOCK_YIELD_WAIT:
1447          LOG.info(lockState + " " + proc);
1448          scheduler.yield(proc);
1449          break;
1450        case LOCK_EVENT_WAIT:
1451          // Someone will wake us up when the lock is available
1452          LOG.debug(lockState + " " + proc);
1453          break;
1454        default:
1455          throw new UnsupportedOperationException();
1456      }
1457      procStack.release(proc);
1458
1459      if (proc.isSuccess()) {
1460        // update metrics on finishing the procedure
1461        proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
1462        LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
1463        // Finalize the procedure state
1464        if (proc.getProcId() == rootProcId) {
1465          procedureFinished(proc);
1466        } else {
1467          execCompletionCleanup(proc);
1468        }
1469        break;
1470      }
1471    } while (procStack.isFailed());
1472  }
1473
1474  private LockState acquireLock(Procedure<TEnvironment> proc) {
1475    TEnvironment env = getEnvironment();
1476    // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
1477    // hasLock is true.
1478    if (proc.hasLock()) {
1479      return LockState.LOCK_ACQUIRED;
1480    }
1481    return proc.doAcquireLock(env, store);
1482  }
1483
1484  private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
1485    TEnvironment env = getEnvironment();
1486    // For how the framework works, we know that we will always have the lock
1487    // when we call releaseLock(), so we can avoid calling proc.hasLock()
1488    if (force || !proc.holdLock(env) || proc.isFinished()) {
1489      proc.doReleaseLock(env, store);
1490    }
1491  }
1492
1493  // Returning null means we have already held the execution lock, so you do not need to get the
1494  // lock entry for releasing
1495  private IdLock.Entry getLockEntryForRollback(long procId) {
1496    // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
1497    // this check, as the worker will hold the lock before executing a procedure. This is the only
1498    // place where we may hold two procedure execution locks, and there is a fence in the
1499    // RootProcedureState where we can make sure that only one worker can execute the rollback of
1500    // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
1501    // prevent race between us and the force update thread.
1502    if (!procExecutionLock.isHeldByCurrentThread(procId)) {
1503      try {
1504        return procExecutionLock.getLockEntry(procId);
1505      } catch (IOException e) {
1506        // can only happen if interrupted, so not a big deal to propagate it
1507        throw new UncheckedIOException(e);
1508      }
1509    }
1510    return null;
1511  }
1512
1513  private void executeUnexpectedRollback(Procedure<TEnvironment> rootProc,
1514    RootProcedureState<TEnvironment> procStack) {
1515    if (procStack.getSubprocs() != null) {
1516      // comparing proc id in reverse order, so we will delete later procedures first, otherwise we
1517      // may delete parent procedure first and if we fail in the middle of this operation, when
1518      // loading we will find some orphan procedures
1519      PriorityQueue<Procedure<TEnvironment>> pq =
1520        new PriorityQueue<>(procStack.getSubprocs().size(),
1521          Comparator.<Procedure<TEnvironment>> comparingLong(Procedure::getProcId).reversed());
1522      pq.addAll(procStack.getSubprocs());
1523      for (;;) {
1524        Procedure<TEnvironment> subproc = pq.poll();
1525        if (subproc == null) {
1526          break;
1527        }
1528        if (!procedures.containsKey(subproc.getProcId())) {
1529          // this means it has already been rolledback
1530          continue;
1531        }
1532        IdLock.Entry lockEntry = getLockEntryForRollback(subproc.getProcId());
1533        try {
1534          cleanupAfterRollbackOneStep(subproc);
1535          execCompletionCleanup(subproc);
1536        } finally {
1537          if (lockEntry != null) {
1538            procExecutionLock.releaseLockEntry(lockEntry);
1539          }
1540        }
1541      }
1542    }
1543    IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId());
1544    try {
1545      cleanupAfterRollbackOneStep(rootProc);
1546    } finally {
1547      if (lockEntry != null) {
1548        procExecutionLock.releaseLockEntry(lockEntry);
1549      }
1550    }
1551  }
1552
1553  private LockState executeNormalRollback(Procedure<TEnvironment> rootProc,
1554    RootProcedureState<TEnvironment> procStack) {
1555    List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
1556    assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
1557
1558    int stackTail = subprocStack.size();
1559    while (stackTail-- > 0) {
1560      Procedure<TEnvironment> proc = subprocStack.get(stackTail);
1561      IdLock.Entry lockEntry = getLockEntryForRollback(proc.getProcId());
1562      try {
1563        // For the sub procedures which are successfully finished, we do not rollback them.
1564        // Typically, if we want to rollback a procedure, we first need to rollback it, and then
1565        // recursively rollback its ancestors. The state changes which are done by sub procedures
1566        // should be handled by parent procedures when rolling back. For example, when rolling back
1567        // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
1568        // online, instead of rolling back the original procedures which offlined the regions(in
1569        // fact these procedures can not be rolled back...).
1570        if (proc.isSuccess()) {
1571          // Just do the cleanup work, without actually executing the rollback
1572          subprocStack.remove(stackTail);
1573          cleanupAfterRollbackOneStep(proc);
1574          continue;
1575        }
1576        LockState lockState = acquireLock(proc);
1577        if (lockState != LockState.LOCK_ACQUIRED) {
1578          // can't take a lock on the procedure, add the root-proc back on the
1579          // queue waiting for the lock availability
1580          return lockState;
1581        }
1582
1583        lockState = executeRollback(proc);
1584        releaseLock(proc, false);
1585        boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
1586        abortRollback |= !isRunning() || !store.isRunning();
1587
1588        // allows to kill the executor before something is stored to the wal.
1589        // useful to test the procedure recovery.
1590        if (abortRollback) {
1591          return lockState;
1592        }
1593
1594        subprocStack.remove(stackTail);
1595
1596        // if the procedure is kind enough to pass the slot to someone else, yield
1597        // if the proc is already finished, do not yield
1598        if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
1599          return LockState.LOCK_YIELD_WAIT;
1600        }
1601
1602        if (proc != rootProc) {
1603          execCompletionCleanup(proc);
1604        }
1605      } finally {
1606        if (lockEntry != null) {
1607          procExecutionLock.releaseLockEntry(lockEntry);
1608        }
1609      }
1610    }
1611    return LockState.LOCK_ACQUIRED;
1612  }
1613
1614  /**
1615   * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
1616   * root-procedure will be visible as finished to user, and the result will be the fatal exception.
1617   */
1618  private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
1619    Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
1620    RemoteProcedureException exception = rootProc.getException();
1621    // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
1622    // rolling back because the subprocedure does. Clarify.
1623    if (exception == null) {
1624      exception = procStack.getException();
1625      rootProc.setFailure(exception);
1626      store.update(rootProc);
1627    }
1628
1629    if (procStack.isRollbackSupported()) {
1630      LockState lockState = executeNormalRollback(rootProc, procStack);
1631      if (lockState != LockState.LOCK_ACQUIRED) {
1632        return lockState;
1633      }
1634    } else {
1635      // the procedure does not support rollback, so typically we should not reach here, this
1636      // usually means there are code bugs, let's just wait all the subprocedures to finish and then
1637      // mark the root procedure as failure.
1638      LOG.error(HBaseMarkers.FATAL,
1639        "Root Procedure {} does not support rollback but the execution failed"
1640          + " and try to rollback, code bug?",
1641        rootProc, exception);
1642      executeUnexpectedRollback(rootProc, procStack);
1643    }
1644
1645    IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId());
1646    try {
1647      // Finalize the procedure state
1648      LOG.info("Rolled back {} exec-time={}", rootProc,
1649        StringUtils.humanTimeDiff(rootProc.elapsedTime()));
1650      procedureFinished(rootProc);
1651    } finally {
1652      if (lockEntry != null) {
1653        procExecutionLock.releaseLockEntry(lockEntry);
1654      }
1655    }
1656
1657    return LockState.LOCK_ACQUIRED;
1658  }
1659
1660  private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
1661    if (testing != null && testing.shouldKillBeforeStoreUpdateInRollback()) {
1662      kill("TESTING: Kill BEFORE store update in rollback: " + proc);
1663    }
1664    if (proc.removeStackIndex()) {
1665      if (!proc.isSuccess()) {
1666        proc.setState(ProcedureState.ROLLEDBACK);
1667      }
1668
1669      // update metrics on finishing the procedure (fail)
1670      proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
1671
1672      if (proc.hasParent()) {
1673        store.delete(proc.getProcId());
1674        procedures.remove(proc.getProcId());
1675      } else {
1676        final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
1677        if (childProcIds != null) {
1678          store.delete(proc, childProcIds);
1679        } else {
1680          store.update(proc);
1681        }
1682      }
1683    } else {
1684      store.update(proc);
1685    }
1686  }
1687
1688  /**
1689   * Execute the rollback of the procedure step. It updates the store with the new state (stack
1690   * index) or will remove completly the procedure in case it is a child.
1691   */
1692  private LockState executeRollback(Procedure<TEnvironment> proc) {
1693    try {
1694      proc.doRollback(getEnvironment());
1695    } catch (IOException e) {
1696      LOG.debug("Roll back attempt failed for {}", proc, e);
1697      return LockState.LOCK_YIELD_WAIT;
1698    } catch (InterruptedException e) {
1699      handleInterruptedException(proc, e);
1700      return LockState.LOCK_YIELD_WAIT;
1701    } catch (Throwable e) {
1702      // Catch NullPointerExceptions or similar errors...
1703      LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
1704    }
1705
1706    cleanupAfterRollbackOneStep(proc);
1707
1708    return LockState.LOCK_ACQUIRED;
1709  }
1710
1711  private void yieldProcedure(Procedure<TEnvironment> proc) {
1712    releaseLock(proc, false);
1713    scheduler.yield(proc);
1714  }
1715
1716  /**
1717   * Executes <code>procedure</code>
1718   * <ul>
1719   * <li>Calls the doExecute() of the procedure
1720   * <li>If the procedure execution didn't fail (i.e. valid user input)
1721   * <ul>
1722   * <li>...and returned subprocedures
1723   * <ul>
1724   * <li>The subprocedures are initialized.
1725   * <li>The subprocedures are added to the store
1726   * <li>The subprocedures are added to the runnable queue
1727   * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
1728   * </ul>
1729   * </li>
1730   * <li>...if there are no subprocedure
1731   * <ul>
1732   * <li>the procedure completed successfully
1733   * <li>if there is a parent (WAITING)
1734   * <li>the parent state will be set to RUNNABLE
1735   * </ul>
1736   * </li>
1737   * </ul>
1738   * </li>
1739   * <li>In case of failure
1740   * <ul>
1741   * <li>The store is updated with the new state</li>
1742   * <li>The executor (caller of this method) will start the rollback of the procedure</li>
1743   * </ul>
1744   * </li>
1745   * </ul>
1746   */
1747  private void execProcedure(RootProcedureState<TEnvironment> procStack,
1748    Procedure<TEnvironment> procedure) {
1749    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
1750      "NOT RUNNABLE! " + procedure.toString());
1751
1752    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
1753    // The exception is caught below and then we hurry to the exit without disturbing state. The
1754    // idea is that the processing of this procedure will be unsuspended later by an external event
1755    // such the report of a region open.
1756    boolean suspended = false;
1757
1758    // Whether to 're-' -execute; run through the loop again.
1759    boolean reExecute = false;
1760
1761    Procedure<TEnvironment>[] subprocs = null;
1762    do {
1763      reExecute = false;
1764      procedure.resetPersistence();
1765      try {
1766        subprocs = procedure.doExecute(getEnvironment());
1767        if (subprocs != null && subprocs.length == 0) {
1768          subprocs = null;
1769        }
1770      } catch (ProcedureSuspendedException e) {
1771        LOG.trace("Suspend {}", procedure);
1772        suspended = true;
1773      } catch (ProcedureYieldException e) {
1774        LOG.trace("Yield {}", procedure, e);
1775        yieldProcedure(procedure);
1776        return;
1777      } catch (InterruptedException e) {
1778        LOG.trace("Yield interrupt {}", procedure, e);
1779        handleInterruptedException(procedure, e);
1780        yieldProcedure(procedure);
1781        return;
1782      } catch (Throwable e) {
1783        // Catch NullPointerExceptions or similar errors...
1784        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
1785        LOG.error(msg, e);
1786        procedure.setFailure(new RemoteProcedureException(msg, e));
1787      }
1788
1789      if (!procedure.isFailed()) {
1790        if (subprocs != null) {
1791          if (subprocs.length == 1 && subprocs[0] == procedure) {
1792            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
1793            // i.e. we go around this loop again rather than go back out on the scheduler queue.
1794            subprocs = null;
1795            reExecute = true;
1796            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
1797          } else {
1798            // Yield the current procedure, and make the subprocedure runnable
1799            // subprocs may come back 'null'.
1800            subprocs = initializeChildren(procStack, procedure, subprocs);
1801            LOG.info("Initialized subprocedures=" + (subprocs == null
1802              ? null
1803              : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList())
1804                .toString()));
1805          }
1806        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
1807          LOG.trace("Added to timeoutExecutor {}", procedure);
1808          timeoutExecutor.add(procedure);
1809        } else if (!suspended) {
1810          // No subtask, so we are done
1811          procedure.setState(ProcedureState.SUCCESS);
1812        }
1813      }
1814
1815      // allows to kill the executor before something is stored to the wal.
1816      // useful to test the procedure recovery.
1817      if (
1818        testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())
1819      ) {
1820        kill("TESTING: Kill BEFORE store update: " + procedure);
1821      }
1822
1823      // TODO: The code here doesn't check if store is running before persisting to the store as
1824      // it relies on the method call below to throw RuntimeException to wind up the stack and
1825      // executor thread to stop. The statement following the method call below seems to check if
1826      // store is not running, to prevent scheduling children procedures, re-execution or yield
1827      // of this procedure. This may need more scrutiny and subsequent cleanup in future
1828      //
1829      // Commit the transaction even if a suspend (state may have changed). Note this append
1830      // can take a bunch of time to complete.
1831      if (procedure.needPersistence()) {
1832        // Add the procedure to the stack
1833        // See HBASE-28210 on why we need synchronized here
1834        boolean needUpdateStoreOutsideLock = false;
1835        synchronized (procStack) {
1836          if (procStack.addRollbackStep(procedure)) {
1837            updateStoreOnExec(procStack, procedure, subprocs);
1838          } else {
1839            needUpdateStoreOutsideLock = true;
1840          }
1841        }
1842        // this is an optimization if we do not need to maintain rollback step, as all subprocedures
1843        // of the same root procedure share the same root procedure state, if we can only update
1844        // store under the above lock, the sub procedures of the same root procedure can only be
1845        // persistent sequentially, which will have a bad performance. See HBASE-28212 for more
1846        // details.
1847        if (needUpdateStoreOutsideLock) {
1848          updateStoreOnExec(procStack, procedure, subprocs);
1849        }
1850      }
1851
1852      // if the store is not running we are aborting
1853      if (!store.isRunning()) {
1854        return;
1855      }
1856      // if the procedure is kind enough to pass the slot to someone else, yield
1857      if (
1858        procedure.isRunnable() && !suspended
1859          && procedure.isYieldAfterExecutionStep(getEnvironment())
1860      ) {
1861        yieldProcedure(procedure);
1862        return;
1863      }
1864
1865      assert (reExecute && subprocs == null) || !reExecute;
1866    } while (reExecute);
1867
1868    // Allows to kill the executor after something is stored to the WAL but before the below
1869    // state settings are done -- in particular the one on the end where we make parent
1870    // RUNNABLE again when its children are done; see countDownChildren.
1871    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
1872      kill("TESTING: Kill AFTER store update: " + procedure);
1873    }
1874
1875    // Submit the new subprocedures
1876    if (subprocs != null && !procedure.isFailed()) {
1877      submitChildrenProcedures(subprocs);
1878    }
1879
1880    // we need to log the release lock operation before waking up the parent procedure, as there
1881    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
1882    // the sub procedures from store and cause problems...
1883    releaseLock(procedure, false);
1884
1885    // if the procedure is complete and has a parent, count down the children latch.
1886    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
1887    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
1888      countDownChildren(procStack, procedure);
1889    }
1890  }
1891
1892  private void kill(String msg) {
1893    LOG.debug(msg);
1894    stop();
1895    throw new RuntimeException(msg);
1896  }
1897
1898  private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
1899    Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1900    assert subprocs != null : "expected subprocedures";
1901    final long rootProcId = getRootProcedureId(procedure);
1902    for (int i = 0; i < subprocs.length; ++i) {
1903      Procedure<TEnvironment> subproc = subprocs[i];
1904      if (subproc == null) {
1905        String msg = "subproc[" + i + "] is null, aborting the procedure";
1906        procedure
1907          .setFailure(new RemoteProcedureException(msg, new IllegalArgumentIOException(msg)));
1908        return null;
1909      }
1910
1911      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
1912      subproc.setParentProcId(procedure.getProcId());
1913      subproc.setRootProcId(rootProcId);
1914      subproc.setProcId(nextProcId());
1915      procStack.addSubProcedure(subproc);
1916    }
1917
1918    if (!procedure.isFailed()) {
1919      procedure.setChildrenLatch(subprocs.length);
1920      switch (procedure.getState()) {
1921        case RUNNABLE:
1922          procedure.setState(ProcedureState.WAITING);
1923          break;
1924        case WAITING_TIMEOUT:
1925          timeoutExecutor.add(procedure);
1926          break;
1927        default:
1928          break;
1929      }
1930    }
1931    return subprocs;
1932  }
1933
1934  private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
1935    for (int i = 0; i < subprocs.length; ++i) {
1936      Procedure<TEnvironment> subproc = subprocs[i];
1937      subproc.updateMetricsOnSubmit(getEnvironment());
1938      assert !procedures.containsKey(subproc.getProcId());
1939      procedures.put(subproc.getProcId(), subproc);
1940      scheduler.addFront(subproc);
1941    }
1942  }
1943
1944  private void countDownChildren(RootProcedureState<TEnvironment> procStack,
1945    Procedure<TEnvironment> procedure) {
1946    Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
1947    if (parent == null) {
1948      assert procStack.isRollingback();
1949      return;
1950    }
1951
1952    // If this procedure is the last child awake the parent procedure
1953    if (parent.tryRunnable()) {
1954      // If we succeeded in making the parent runnable -- i.e. all of its
1955      // children have completed, move parent to front of the queue.
1956      store.update(parent);
1957      scheduler.addFront(parent);
1958      LOG.info("Finished subprocedure pid={}, resume processing ppid={}", procedure.getProcId(),
1959        parent.getProcId());
1960      return;
1961    }
1962  }
1963
1964  private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
1965    Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
1966    if (subprocs != null && !procedure.isFailed()) {
1967      if (LOG.isTraceEnabled()) {
1968        LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
1969      }
1970      store.insert(procedure, subprocs);
1971    } else {
1972      LOG.trace("Store update {}", procedure);
1973      if (procedure.isFinished() && !procedure.hasParent()) {
1974        // remove child procedures
1975        final long[] childProcIds = procStack.getSubprocedureIds();
1976        if (childProcIds != null) {
1977          store.delete(procedure, childProcIds);
1978          for (int i = 0; i < childProcIds.length; ++i) {
1979            procedures.remove(childProcIds[i]);
1980          }
1981        } else {
1982          store.update(procedure);
1983        }
1984      } else {
1985        store.update(procedure);
1986      }
1987    }
1988  }
1989
1990  private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
1991    LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
1992    // NOTE: We don't call Thread.currentThread().interrupt()
1993    // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
1994    // the InterruptedException. If the master is going down, we will be notified
1995    // and the executor/store will be stopped.
1996    // (The interrupted procedure will be retried on the next run)
1997  }
1998
1999  private void execCompletionCleanup(Procedure<TEnvironment> proc) {
2000    final TEnvironment env = getEnvironment();
2001    if (proc.hasLock()) {
2002      LOG.warn("Usually this should not happen, we will release the lock before if the procedure"
2003        + " is finished, even if the holdLock is true, arrive here means we have some holes where"
2004        + " we do not release the lock. And the releaseLock below may fail since the procedure may"
2005        + " have already been deleted from the procedure store.");
2006      releaseLock(proc, true);
2007    }
2008    try {
2009      proc.completionCleanup(env);
2010    } catch (Throwable e) {
2011      // Catch NullPointerExceptions or similar errors...
2012      LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
2013    }
2014  }
2015
2016  private void procedureFinished(Procedure<TEnvironment> proc) {
2017    // call the procedure completion cleanup handler
2018    execCompletionCleanup(proc);
2019
2020    CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
2021
2022    // update the executor internal state maps
2023    if (!proc.shouldWaitClientAck(getEnvironment())) {
2024      retainer.setClientAckTime(0);
2025    }
2026
2027    completed.put(proc.getProcId(), retainer);
2028    rollbackStack.remove(proc.getProcId());
2029    procedures.remove(proc.getProcId());
2030
2031    // call the runnableSet completion cleanup handler
2032    try {
2033      scheduler.completionCleanup(proc);
2034    } catch (Throwable e) {
2035      // Catch NullPointerExceptions or similar errors...
2036      LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
2037    }
2038
2039    // Notify the listeners
2040    sendProcedureFinishedNotification(proc.getProcId());
2041  }
2042
2043  RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
2044    return rollbackStack.get(rootProcId);
2045  }
2046
2047  ProcedureScheduler getProcedureScheduler() {
2048    return scheduler;
2049  }
2050
2051  int getCompletedSize() {
2052    return completed.size();
2053  }
2054
2055  public IdLock getProcExecutionLock() {
2056    return procExecutionLock;
2057  }
2058
2059  // ==========================================================================
2060  // Worker Thread
2061  // ==========================================================================
2062  private class WorkerThread extends StoppableThread {
2063    private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
2064    private volatile Procedure<TEnvironment> activeProcedure;
2065
2066    public WorkerThread(ThreadGroup group) {
2067      this(group, "PEWorker-");
2068    }
2069
2070    protected WorkerThread(ThreadGroup group, String prefix) {
2071      super(group, prefix + workerId.incrementAndGet());
2072      setDaemon(true);
2073    }
2074
2075    @Override
2076    public void sendStopSignal() {
2077      scheduler.signalAll();
2078    }
2079
2080    /**
2081     * Encapsulates execution of the current {@link #activeProcedure} for easy tracing.
2082     */
2083    private long runProcedure() throws IOException {
2084      final Procedure<TEnvironment> proc = this.activeProcedure;
2085      int activeCount = activeExecutorCount.incrementAndGet();
2086      int runningCount = store.setRunningProcedureCount(activeCount);
2087      LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount,
2088        activeCount);
2089      executionStartTime.set(EnvironmentEdgeManager.currentTime());
2090      IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
2091      try {
2092        executeProcedure(proc);
2093      } catch (AssertionError e) {
2094        LOG.info("ASSERT pid=" + proc.getProcId(), e);
2095        throw e;
2096      } finally {
2097        procExecutionLock.releaseLockEntry(lockEntry);
2098        activeCount = activeExecutorCount.decrementAndGet();
2099        runningCount = store.setRunningProcedureCount(activeCount);
2100        LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount,
2101          activeCount);
2102        this.activeProcedure = null;
2103        executionStartTime.set(Long.MAX_VALUE);
2104      }
2105      return EnvironmentEdgeManager.currentTime();
2106    }
2107
2108    @Override
2109    public void run() {
2110      long lastUpdate = EnvironmentEdgeManager.currentTime();
2111      try {
2112        while (isRunning() && keepAlive(lastUpdate)) {
2113          @SuppressWarnings("unchecked")
2114          Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
2115          if (proc == null) {
2116            continue;
2117          }
2118          this.activeProcedure = proc;
2119          lastUpdate = TraceUtil.trace(this::runProcedure, new ProcedureSpanBuilder(proc));
2120        }
2121      } catch (Throwable t) {
2122        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
2123      } finally {
2124        LOG.trace("Worker terminated.");
2125      }
2126      workerThreads.remove(this);
2127    }
2128
2129    @Override
2130    public String toString() {
2131      Procedure<?> p = this.activeProcedure;
2132      return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")");
2133    }
2134
2135    /** Returns the time since the current procedure is running */
2136    public long getCurrentRunTime() {
2137      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
2138    }
2139
2140    // core worker never timeout
2141    protected boolean keepAlive(long lastUpdate) {
2142      return true;
2143    }
2144  }
2145
2146  // A worker thread which can be added when core workers are stuck. Will timeout after
2147  // keepAliveTime if there is no procedure to run.
2148  private final class KeepAliveWorkerThread extends WorkerThread {
2149    public KeepAliveWorkerThread(ThreadGroup group) {
2150      super(group, "KeepAlivePEWorker-");
2151    }
2152
2153    @Override
2154    protected boolean keepAlive(long lastUpdate) {
2155      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
2156    }
2157  }
2158
2159  // ----------------------------------------------------------------------------
2160  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
2161  // full set of procedures pending and completed to write a compacted
2162  // version of the log (in case is a log)?
2163  // In theory no, procedures are have a short life, so at some point the store
2164  // will have the tracker saying everything is in the last log.
2165  // ----------------------------------------------------------------------------
2166
2167  private final class WorkerMonitor extends InlineChore {
2168    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
2169      "hbase.procedure.worker.monitor.interval.msec";
2170    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
2171
2172    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
2173      "hbase.procedure.worker.stuck.threshold.msec";
2174    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
2175
2176    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
2177      "hbase.procedure.worker.add.stuck.percentage";
2178    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
2179
2180    private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
2181    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
2182    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
2183
2184    public WorkerMonitor() {
2185      refreshConfig();
2186    }
2187
2188    @Override
2189    public void run() {
2190      final int stuckCount = checkForStuckWorkers();
2191      checkThreadCount(stuckCount);
2192
2193      // refresh interval (poor man dynamic conf update)
2194      refreshConfig();
2195    }
2196
2197    private int checkForStuckWorkers() {
2198      // check if any of the worker is stuck
2199      int stuckCount = 0;
2200      for (WorkerThread worker : workerThreads) {
2201        if (worker.getCurrentRunTime() < stuckThreshold) {
2202          continue;
2203        }
2204
2205        // WARN the worker is stuck
2206        stuckCount++;
2207        LOG.warn("Worker stuck {}, run time {}", worker,
2208          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
2209      }
2210      return stuckCount;
2211    }
2212
2213    private void checkThreadCount(final int stuckCount) {
2214      // nothing to do if there are no runnable tasks
2215      if (stuckCount < 1 || !scheduler.hasRunnables()) {
2216        return;
2217      }
2218
2219      // add a new thread if the worker stuck percentage exceed the threshold limit
2220      // and every handler is active.
2221      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
2222      // let's add new worker thread more aggressively, as they will timeout finally if there is no
2223      // work to do.
2224      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
2225        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
2226        workerThreads.add(worker);
2227        worker.start();
2228        LOG.debug("Added new worker thread {}", worker);
2229      }
2230    }
2231
2232    private void refreshConfig() {
2233      addWorkerStuckPercentage =
2234        conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
2235      timeoutInterval =
2236        conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL);
2237      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, DEFAULT_WORKER_STUCK_THRESHOLD);
2238    }
2239
2240    @Override
2241    public int getTimeoutInterval() {
2242      return timeoutInterval;
2243    }
2244  }
2245}