Class ProcedureExecutor<TEnvironment>
java.lang.Object
org.apache.hadoop.hbase.procedure2.ProcedureExecutor<TEnvironment>
Thread Pool that executes the submitted procedures. The executor has a ProcedureStore associated.
Each operation is logged and on restart the pending procedures are resumed. Unless the Procedure
code throws an error (e.g. invalid user input) the procedure will complete (at some point in
time), On restart the pending procedures are resumed and the once failed will be rolledback. The
user can add procedures to the executor via submitProcedure(proc) check for the finished state
via isFinished(procId) and get the result via getResult(procId)
-
Nested Class Summary
Modifier and TypeClassDescriptionprivate final class
static interface
static class
Class with parameters describing how to fail/die when in testing-context.private final class
private class
-
Field Summary
Modifier and TypeFieldDescriptionprivate final AtomicInteger
private ExecutorService
static final String
private final boolean
private final ConcurrentHashMap<Long,
CompletedProcedureRetainer<TEnvironment>> Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.private org.apache.hadoop.conf.Configuration
private int
(package private) static final int
private static final boolean
(package private) static final int
private static final long
private final TEnvironment
static final String
static final String
private ExecutorService
private long
private final AtomicLong
private final CopyOnWriteArrayList<ProcedureExecutor.ProcedureExecutorListener>
private static final org.slf4j.Logger
private int
private final ConcurrentHashMap<NonceKey,
Long> Helper map to lookup whether the procedure already issued from the same client.private final ConcurrentHashMap<Long,
Procedure<TEnvironment>> Helper map to lookup the live procedures by ID.private final IdLock
private final ConcurrentHashMap<Long,
RootProcedureState<TEnvironment>> Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.private final AtomicBoolean
private final ProcedureScheduler
Scheduler/Queue that contains runnable procedures.private final ProcedureStore
(package private) ProcedureExecutor.Testing
testing
is non-null when ProcedureExecutor is being tested.private ThreadGroup
Created in theinit(int, boolean)
method.private TimeoutExecutorThread<TEnvironment>
Created in theinit(int, boolean)
method.static final String
private final AtomicLong
private TimeoutExecutorThread<TEnvironment>
WorkerMonitor check for stuck workers and new worker thread when necessary, for example if there is no worker to assign meta, it will new worker thread for it, so it is very important.Created in theinit(int, boolean)
method. -
Constructor Summary
ConstructorDescriptionProcedureExecutor
(org.apache.hadoop.conf.Configuration conf, TEnvironment environment, ProcedureStore store) ProcedureExecutor
(org.apache.hadoop.conf.Configuration conf, TEnvironment environment, ProcedureStore store, ProcedureScheduler scheduler) -
Method Summary
Modifier and TypeMethodDescriptionboolean
abort
(long procId) Send an abort notification the specified procedure.boolean
abort
(long procId, boolean mayInterruptIfRunning) Send an abort notification to the specified procedure.private Procedure.LockState
acquireLock
(Procedure<TEnvironment> proc) void
Add a chore procedure to the executor(package private) boolean
bypassProcedure
(long pid, long lockWait, boolean override, boolean recursive) bypassProcedure
(List<Long> pids, long lockWait, boolean force, boolean recursive) Bypass a procedure.private void
private void
countDownChildren
(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure) createNonceKey
(long nonceGroup, long nonce) Create a NonceKey from the specified nonceGroup and nonce.private void
private void
execProcedure
(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure) Executesprocedure
Calls the doExecute() of the procedure If the procedure execution didn't fail (i.e.private Procedure.LockState
executeNormalRollback
(Procedure<TEnvironment> rootProc, RootProcedureState<TEnvironment> procStack) private void
private Procedure.LockState
executeRollback
(long rootProcId, RootProcedureState<TEnvironment> procStack) Execute the rollback of the full procedure stack.private Procedure.LockState
executeRollback
(Procedure<TEnvironment> proc) Execute the rollback of the procedure step.private void
executeUnexpectedRollback
(Procedure<TEnvironment> rootProc, RootProcedureState<TEnvironment> procStack) private void
forceUpdateProcedure
(long procId) int
Should only be used when starting up, where the procedure workers have not been started.Get a thread pool for executing some asynchronous tasks(package private) int
int
Returns the core pool size settings.long
getKeepAliveTime
(TimeUnit timeUnit) protected long
private IdLock.Entry
getLockEntryForRollback
(long procId) getProcedure
(long procId) <T extends Procedure<TEnvironment>>
TgetProcedure
(Class<T> clazz, long procId) Get procedures.(package private) ProcedureScheduler
(package private) RootProcedureState<TEnvironment>
getProcStack
(long rootProcId) getResult
(long procId) getResultOrProcedure
(long procId) (package private) Long
(package private) ProcedureScheduler
getStore()
int
Returns the current number of worker threads.private void
void
init
(int numThreads, boolean abortOnCorruption) Initialize the procedure executor, but do not start workers.private Procedure<TEnvironment>[]
initializeChildren
(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) private void
initializeStacks
(ProcedureStore.ProcedureIterator procIter, List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList, List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> waitingTimeoutList) boolean
isFinished
(long procId) Return true if the procedure is finished.boolean
isProcedureOwner
(long procId, User user) Check if the user is this procedure's ownerprivate boolean
isRootFinished
(Procedure<?> proc) boolean
boolean
isStarted
(long procId) Return true if the procedure is started.void
join()
private void
private void
load
(boolean abortOnCorruption) private void
private long
private Procedure<TEnvironment>
private void
processWaitingProcedures
(List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> runnableList) private void
processWaitingTimeoutProcedures
(List<Procedure<TEnvironment>> waitingTimeoutList) private long
pushProcedure
(Procedure<TEnvironment> proc) private void
pushProceduresAfterLoad
(List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList) void
refreshConfiguration
(org.apache.hadoop.conf.Configuration conf) void
long
registerNonce
(NonceKey nonceKey) Register a nonce for a procedure that is going to be submitted.private void
releaseLock
(Procedure<TEnvironment> proc, boolean force) boolean
Remove a chore procedure from the executorvoid
removeResult
(long procId) Mark the specified completed procedure, as ready to remove.private void
restoreLock
(Procedure<TEnvironment> proc, Set<Long> restored) private void
private void
restoreLocks
(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) private void
private void
sendProcedureAddedNotification
(long procId) private void
sendProcedureFinishedNotification
(long procId) private void
sendProcedureLoadedNotification
(long procId) void
setFailureResultForNonce
(NonceKey nonceKey, String procName, User procOwner, IOException exception) If the failure failed before submitting it, we may want to give back the same error to the requests with the same nonceKey.void
setKeepAliveTime
(long keepAliveTime, TimeUnit timeUnit) void
Start the workers.void
stop()
private void
submitChildrenProcedures
(Procedure<TEnvironment>[] subprocs) long
submitProcedure
(Procedure<TEnvironment> proc) Add a new root-procedure to the executor.long
submitProcedure
(Procedure<TEnvironment> proc, NonceKey nonceKey) Add a new root-procedure to the executor.void
submitProcedures
(Procedure<TEnvironment>[] procs) Add a set of new root-procedure to the executor.boolean
void
Remove the NonceKey if the procedure was not submitted to the executor.private void
updateStoreOnExec
(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) private void
yieldProcedure
(Procedure<TEnvironment> proc)
-
Field Details
-
LOG
-
CHECK_OWNER_SET_CONF_KEY
- See Also:
-
DEFAULT_CHECK_OWNER_SET
- See Also:
-
WORKER_KEEP_ALIVE_TIME_CONF_KEY
- See Also:
-
DEFAULT_WORKER_KEEP_ALIVE_TIME
-
EVICT_TTL_CONF_KEY
- See Also:
-
DEFAULT_EVICT_TTL
- See Also:
-
EVICT_ACKED_TTL_CONF_KEY
- See Also:
-
DEFAULT_ACKED_EVICT_TTL
- See Also:
-
testing
testing
is non-null when ProcedureExecutor is being tested. Tests will try to break PE having it fail at various junctures. When non-null, testing is set to an instance of the below internalProcedureExecutor.Testing
class with flags set for the particular test. -
completed
Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. Once a Root-Procedure completes (success or failure), the result will be added to this map. The user of ProcedureExecutor should call getResult(procId) to get the result. -
rollbackStack
Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. The RootProcedureState contains the execution stack of the Root-Procedure, It is added to the map by submitProcedure() and removed on procedure completion. -
procedures
Helper map to lookup the live procedures by ID. This map contains every procedure. root-procedures and subprocedures. -
nonceKeysToProcIdsMap
Helper map to lookup whether the procedure already issued from the same client. This map contains every root procedure. -
listeners
-
conf
-
threadGroup
Created in theinit(int, boolean)
method. Destroyed injoin()
(FIX! Doing resource handling rather than observing in a #join is unexpected). Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). -
workerThreads
Created in theinit(int, boolean)
method. Terminated injoin()
(FIX! Doing resource handling rather than observing in a #join is unexpected). Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). -
timeoutExecutor
Created in theinit(int, boolean)
method. Terminated injoin()
(FIX! Doing resource handling rather than observing in a #join is unexpected). Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). -
workerMonitorExecutor
WorkerMonitor check for stuck workers and new worker thread when necessary, for example if there is no worker to assign meta, it will new worker thread for it, so it is very important. TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore and so on, some tasks may execute for a long time so will block other tasks like WorkerMonitor, so use a dedicated thread for executing WorkerMonitor. -
forceUpdateExecutor
-
asyncTaskExecutor
-
corePoolSize
-
maxPoolSize
-
keepAliveTime
-
scheduler
Scheduler/Queue that contains runnable procedures. -
lastProcId
-
workerId
-
activeExecutorCount
-
running
-
environment
-
store
-
checkOwnerSet
-
procExecutionLock
-
-
Constructor Details
-
ProcedureExecutor
public ProcedureExecutor(org.apache.hadoop.conf.Configuration conf, TEnvironment environment, ProcedureStore store) -
ProcedureExecutor
public ProcedureExecutor(org.apache.hadoop.conf.Configuration conf, TEnvironment environment, ProcedureStore store, ProcedureScheduler scheduler)
-
-
Method Details
-
isRootFinished
-
forceUpdateProcedure
- Throws:
IOException
-
load
- Throws:
IOException
-
restoreLock
-
restoreLocks
-
restoreLocks
-
initializeStacks
private void initializeStacks(ProcedureStore.ProcedureIterator procIter, List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList, List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> waitingTimeoutList) throws IOException - Throws:
IOException
-
processWaitingProcedures
private void processWaitingProcedures(List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> runnableList) -
processWaitingTimeoutProcedures
-
pushProceduresAfterLoad
private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList) -
loadProcedures
- Throws:
IOException
-
init
Initialize the procedure executor, but do not start workers. We will start them later. It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and ensure a single executor, and start the procedure replay to resume and recover the previous pending and in-progress procedures.- Parameters:
numThreads
- number of threads available for procedure execution.abortOnCorruption
- true if you want to abort your service in case a corrupted procedure is found on replay. otherwise false.- Throws:
IOException
-
startWorkers
Start the workers.- Throws:
IOException
-
stop
-
join
-
refreshConfiguration
-
isRunning
-
getWorkerThreadCount
Returns the current number of worker threads. -
getCorePoolSize
Returns the core pool size settings. -
getActiveExecutorCount
-
getEnvironment
-
getStore
-
getScheduler
-
setKeepAliveTime
-
getKeepAliveTime
-
addChore
Add a chore procedure to the executor- Parameters:
chore
- the chore to add
-
removeChore
Remove a chore procedure from the executor- Parameters:
chore
- the chore to remove- Returns:
- whether the chore is removed, or it will be removed later
-
createNonceKey
Create a NonceKey from the specified nonceGroup and nonce. -
registerNonce
Register a nonce for a procedure that is going to be submitted. A procId will be reserved and on submitProcedure(), the procedure with the specified nonce will take the reserved ProcId. If someone already reserved the nonce, this method will return the procId reserved, otherwise an invalid procId will be returned. and the caller should procede and submit the procedure.- Parameters:
nonceKey
- A unique identifier for this operation from the client or process.- Returns:
- the procId associated with the nonce, if any otherwise an invalid procId.
-
unregisterNonceIfProcedureWasNotSubmitted
Remove the NonceKey if the procedure was not submitted to the executor.- Parameters:
nonceKey
- A unique identifier for this operation from the client or process.
-
setFailureResultForNonce
public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, IOException exception) If the failure failed before submitting it, we may want to give back the same error to the requests with the same nonceKey.- Parameters:
nonceKey
- A unique identifier for this operation from the client or processprocName
- name of the procedure, used to inform the userprocOwner
- name of the owner of the procedure, used to inform the userexception
- the failure to report to the user
-
submitProcedure
Add a new root-procedure to the executor.- Parameters:
proc
- the new procedure to execute.- Returns:
- the procedure id, that can be used to monitor the operation
-
bypassProcedure
public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, boolean recursive) throws IOException Bypass a procedure. If the procedure is set to bypass, all the logic in execute/rollback will be ignored and it will return success, whatever. It is used to recover buggy stuck procedures, releasing the lock resources and letting other procedures run. Bypassing one procedure (and its ancestors will be bypassed automatically) may leave the cluster in a middle state, e.g. region not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, the operators may have to do some clean up on hdfs or schedule some assign procedures to let region online. DO AT YOUR OWN RISK.A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is executing it 3. No child procedure has been submitted
If all the requirements are meet, the procedure and its ancestors will be bypassed and persisted to WAL.
If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What about WAITING_TIMEOUT?
- Parameters:
pids
- the procedure idlockWait
- time to wait lockforce
- if force set to true, we will bypass the procedure even if it is executing. This is for procedures which can't break out during executing(due to bug, mostly) In this case, bypassing the procedure is not enough, since it is already stuck there. We need to restart the master after bypassing, and letting the problematic procedure to execute wth bypass=true, so in that condition, the procedure can be successfully bypassed.recursive
- We will do an expensive search for children of each pid. EXPENSIVE!- Returns:
- true if bypass success
- Throws:
IOException
- IOException
-
bypassProcedure
boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) throws IOException - Throws:
IOException
-
submitProcedure
Add a new root-procedure to the executor.- Parameters:
proc
- the new procedure to execute.nonceKey
- the registered unique identifier for this operation from the client or process.- Returns:
- the procedure id, that can be used to monitor the operation
-
submitProcedures
Add a set of new root-procedure to the executor.- Parameters:
procs
- the new procedures to execute.
-
prepareProcedure
-
pushProcedure
-
abort
Send an abort notification the specified procedure. Depending on the procedure implementation the abort can be considered or ignored.- Parameters:
procId
- the procedure to abort- Returns:
- true if the procedure exists and has received the abort, otherwise false.
-
abort
Send an abort notification to the specified procedure. Depending on the procedure implementation, the abort can be considered or ignored.- Parameters:
procId
- the procedure to abortmayInterruptIfRunning
- if the proc completed at least one step, should it be aborted?- Returns:
- true if the procedure exists and has received the abort, otherwise false.
-
getProcedure
-
getProcedure
-
getResult
-
isFinished
Return true if the procedure is finished. The state may be "completed successfully" or "failed and rolledback". Use getResult() to check the state or get the result data.- Parameters:
procId
- the ID of the procedure to check- Returns:
- true if the procedure execution is finished, otherwise false.
-
isStarted
Return true if the procedure is started.- Parameters:
procId
- the ID of the procedure to check- Returns:
- true if the procedure execution is started, otherwise false.
-
removeResult
Mark the specified completed procedure, as ready to remove.- Parameters:
procId
- the ID of the procedure to remove
-
getResultOrProcedure
-
isProcedureOwner
Check if the user is this procedure's owner- Parameters:
procId
- the target procedureuser
- the user- Returns:
- true if the user is the owner of the procedure, false otherwise or the owner is unknown.
-
getActiveProceduresNoCopy
Should only be used when starting up, where the procedure workers have not been started. If the procedure works has been started, the return values maybe changed when you are processing it so usually this is not safe. UsegetProcedures()
below for most cases as it will do a copy, and also include the finished procedures. -
getProcedures
Get procedures.- Returns:
- the procedures in a list
-
registerListener
-
unregisterListener
-
sendProcedureLoadedNotification
-
sendProcedureAddedNotification
-
sendProcedureFinishedNotification
-
nextProcId
-
getLastProcId
-
getActiveProcIds
-
getRootProcedureId
-
executeProcedure
-
acquireLock
-
releaseLock
-
getLockEntryForRollback
-
executeUnexpectedRollback
private void executeUnexpectedRollback(Procedure<TEnvironment> rootProc, RootProcedureState<TEnvironment> procStack) -
executeNormalRollback
private Procedure.LockState executeNormalRollback(Procedure<TEnvironment> rootProc, RootProcedureState<TEnvironment> procStack) -
executeRollback
private Procedure.LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) Execute the rollback of the full procedure stack. Once the procedure is rolledback, the root-procedure will be visible as finished to user, and the result will be the fatal exception. -
cleanupAfterRollbackOneStep
-
executeRollback
Execute the rollback of the procedure step. It updates the store with the new state (stack index) or will remove completly the procedure in case it is a child. -
yieldProcedure
-
execProcedure
private void execProcedure(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure) Executesprocedure
- Calls the doExecute() of the procedure
- If the procedure execution didn't fail (i.e. valid user input)
- ...and returned subprocedures
- The subprocedures are initialized.
- The subprocedures are added to the store
- The subprocedures are added to the runnable queue
- The procedure is now in a WAITING state, waiting for the subprocedures to complete
- ...if there are no subprocedure
- the procedure completed successfully
- if there is a parent (WAITING)
- the parent state will be set to RUNNABLE
- ...and returned subprocedures
- In case of failure
- The store is updated with the new state
- The executor (caller of this method) will start the rollback of the procedure
-
kill
-
initializeChildren
private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) -
submitChildrenProcedures
-
countDownChildren
private void countDownChildren(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure) -
updateStoreOnExec
private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) -
handleInterruptedException
-
execCompletionCleanup
-
rootProcedureFinished
-
getProcStack
-
getProcedureScheduler
-
getCompletedSize
int getCompletedSize() -
getProcExecutionLock
-
getAsyncTaskExecutor
Get a thread pool for executing some asynchronous tasks
-