001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import edu.umd.cs.findbugs.annotations.Nullable; 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.util.ArrayDeque; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Comparator; 028import java.util.Deque; 029import java.util.HashSet; 030import java.util.List; 031import java.util.PriorityQueue; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.CopyOnWriteArrayList; 035import java.util.concurrent.Executor; 036import java.util.concurrent.Executors; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.atomic.AtomicLong; 041import java.util.stream.Collectors; 042import java.util.stream.Stream; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 046import org.apache.hadoop.hbase.log.HBaseMarkers; 047import org.apache.hadoop.hbase.procedure2.Procedure.LockState; 048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 049import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 050import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 051import org.apache.hadoop.hbase.procedure2.trace.ProcedureSpanBuilder; 052import org.apache.hadoop.hbase.procedure2.util.StringUtils; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.trace.TraceUtil; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.IdLock; 057import org.apache.hadoop.hbase.util.NonceKey; 058import org.apache.hadoop.hbase.util.Threads; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 065 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 067 068/** 069 * Thread Pool that executes the submitted procedures. The executor has a ProcedureStore associated. 070 * Each operation is logged and on restart the pending procedures are resumed. Unless the Procedure 071 * code throws an error (e.g. invalid user input) the procedure will complete (at some point in 072 * time), On restart the pending procedures are resumed and the once failed will be rolledback. The 073 * user can add procedures to the executor via submitProcedure(proc) check for the finished state 074 * via isFinished(procId) and get the result via getResult(procId) 075 */ 076@InterfaceAudience.Private 077public class ProcedureExecutor<TEnvironment> { 078 private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); 079 080 public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; 081 private static final boolean DEFAULT_CHECK_OWNER_SET = false; 082 083 public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = 084 "hbase.procedure.worker.keep.alive.time.msec"; 085 private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); 086 087 public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; 088 static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min 089 090 public static final String EVICT_ACKED_TTL_CONF_KEY = "hbase.procedure.cleaner.acked.evict.ttl"; 091 static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min 092 093 /** 094 * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to break PE 095 * having it fail at various junctures. When non-null, testing is set to an instance of the below 096 * internal {@link Testing} class with flags set for the particular test. 097 */ 098 volatile Testing testing = null; 099 100 /** 101 * Class with parameters describing how to fail/die when in testing-context. 102 */ 103 public static class Testing { 104 protected volatile boolean killIfHasParent = true; 105 protected volatile boolean killIfSuspended = false; 106 107 /** 108 * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is 109 * persisting all the state it needs to recover after a crash. 110 */ 111 protected volatile boolean killBeforeStoreUpdate = false; 112 protected volatile boolean toggleKillBeforeStoreUpdate = false; 113 114 /** 115 * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978 116 * is about a case where memory-state was being set after store to WAL where a crash could cause 117 * us to get stuck. This flag allows killing at what was a vulnerable time. 118 */ 119 protected volatile boolean killAfterStoreUpdate = false; 120 protected volatile boolean toggleKillAfterStoreUpdate = false; 121 122 protected volatile boolean killBeforeStoreUpdateInRollback = false; 123 protected volatile boolean toggleKillBeforeStoreUpdateInRollback = false; 124 125 protected boolean shouldKillBeforeStoreUpdate() { 126 final boolean kill = this.killBeforeStoreUpdate; 127 if (this.toggleKillBeforeStoreUpdate) { 128 this.killBeforeStoreUpdate = !kill; 129 LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); 130 } 131 return kill; 132 } 133 134 protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) { 135 if (isSuspended && !killIfSuspended) { 136 return false; 137 } 138 if (hasParent && !killIfHasParent) { 139 return false; 140 } 141 return shouldKillBeforeStoreUpdate(); 142 } 143 144 protected boolean shouldKillAfterStoreUpdate() { 145 final boolean kill = this.killAfterStoreUpdate; 146 if (this.toggleKillAfterStoreUpdate) { 147 this.killAfterStoreUpdate = !kill; 148 LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate); 149 } 150 return kill; 151 } 152 153 protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { 154 return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); 155 } 156 157 protected boolean shouldKillBeforeStoreUpdateInRollback() { 158 final boolean kill = this.killBeforeStoreUpdateInRollback; 159 if (this.toggleKillBeforeStoreUpdateInRollback) { 160 this.killBeforeStoreUpdateInRollback = !kill; 161 LOG.warn("Toggle KILL before store update in rollback to: " 162 + this.killBeforeStoreUpdateInRollback); 163 } 164 return kill; 165 } 166 } 167 168 public interface ProcedureExecutorListener { 169 void procedureLoaded(long procId); 170 171 void procedureAdded(long procId); 172 173 void procedureFinished(long procId); 174 } 175 176 /** 177 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. Once a 178 * Root-Procedure completes (success or failure), the result will be added to this map. The user 179 * of ProcedureExecutor should call getResult(procId) to get the result. 180 */ 181 private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = 182 new ConcurrentHashMap<>(); 183 184 /** 185 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. 186 * The RootProcedureState contains the execution stack of the Root-Procedure, It is added to the 187 * map by submitProcedure() and removed on procedure completion. 188 */ 189 private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = 190 new ConcurrentHashMap<>(); 191 192 /** 193 * Helper map to lookup the live procedures by ID. This map contains every procedure. 194 * root-procedures and subprocedures. 195 */ 196 private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = 197 new ConcurrentHashMap<>(); 198 199 /** 200 * Helper map to lookup whether the procedure already issued from the same client. This map 201 * contains every root procedure. 202 */ 203 private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); 204 205 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = 206 new CopyOnWriteArrayList<>(); 207 208 private Configuration conf; 209 210 /** 211 * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing 212 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 213 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 214 */ 215 private ThreadGroup threadGroup; 216 217 /** 218 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 219 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 220 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 221 */ 222 private CopyOnWriteArrayList<WorkerThread> workerThreads; 223 224 /** 225 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 226 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 227 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 228 */ 229 private TimeoutExecutorThread<TEnvironment> timeoutExecutor; 230 231 /** 232 * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if 233 * there is no worker to assign meta, it will new worker thread for it, so it is very important. 234 * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore and 235 * so on, some tasks may execute for a long time so will block other tasks like WorkerMonitor, so 236 * use a dedicated thread for executing WorkerMonitor. 237 */ 238 private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor; 239 240 private int corePoolSize; 241 private int maxPoolSize; 242 243 private volatile long keepAliveTime; 244 245 /** 246 * Scheduler/Queue that contains runnable procedures. 247 */ 248 private final ProcedureScheduler scheduler; 249 250 private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( 251 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); 252 253 private final AtomicLong lastProcId = new AtomicLong(-1); 254 private final AtomicLong workerId = new AtomicLong(0); 255 private final AtomicInteger activeExecutorCount = new AtomicInteger(0); 256 private final AtomicBoolean running = new AtomicBoolean(false); 257 private final TEnvironment environment; 258 private final ProcedureStore store; 259 260 private final boolean checkOwnerSet; 261 262 // To prevent concurrent execution of the same procedure. 263 // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the 264 // procedure is woken up before we finish the suspend which causes the same procedures to be 265 // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also 266 // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent 267 // execution of the same procedure. 268 private final IdLock procExecutionLock = new IdLock(); 269 270 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 271 final ProcedureStore store) { 272 this(conf, environment, store, new SimpleProcedureScheduler()); 273 } 274 275 private boolean isRootFinished(Procedure<?> proc) { 276 Procedure<?> rootProc = procedures.get(proc.getRootProcId()); 277 return rootProc == null || rootProc.isFinished(); 278 } 279 280 private void forceUpdateProcedure(long procId) throws IOException { 281 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); 282 try { 283 Procedure<TEnvironment> proc = procedures.get(procId); 284 if (proc != null) { 285 if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) { 286 LOG.debug("Procedure {} has already been finished and parent is succeeded," 287 + " skip force updating", proc); 288 return; 289 } 290 } else { 291 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 292 if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) { 293 LOG.debug("No pending procedure with id = {}, skip force updating.", procId); 294 return; 295 } 296 long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); 297 long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); 298 if (retainer.isExpired(EnvironmentEdgeManager.currentTime(), evictTtl, evictAckTtl)) { 299 LOG.debug("Procedure {} has already been finished and expired, skip force updating", 300 procId); 301 return; 302 } 303 proc = retainer.getProcedure(); 304 } 305 LOG.debug("Force update procedure {}", proc); 306 store.update(proc); 307 } finally { 308 procExecutionLock.releaseLockEntry(lockEntry); 309 } 310 } 311 312 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 313 final ProcedureStore store, final ProcedureScheduler scheduler) { 314 this.environment = environment; 315 this.scheduler = scheduler; 316 this.store = store; 317 this.conf = conf; 318 this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); 319 refreshConfiguration(conf); 320 store.registerListener(new ProcedureStoreListener() { 321 322 @Override 323 public void forceUpdate(long[] procIds) { 324 Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { 325 try { 326 forceUpdateProcedure(procId); 327 } catch (IOException e) { 328 LOG.warn("Failed to force update procedure with pid={}", procId); 329 } 330 })); 331 } 332 }); 333 } 334 335 private void load(final boolean abortOnCorruption) throws IOException { 336 Preconditions.checkArgument(completed.isEmpty(), "completed not empty: %s", completed); 337 Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty: %s", 338 rollbackStack); 339 Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty: %s", procedures); 340 Preconditions.checkArgument(scheduler.size() == 0, "scheduler queue not empty: %s", scheduler); 341 342 store.load(new ProcedureStore.ProcedureLoader() { 343 @Override 344 public void setMaxProcId(long maxProcId) { 345 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; 346 lastProcId.set(maxProcId); 347 } 348 349 @Override 350 public void load(ProcedureIterator procIter) throws IOException { 351 loadProcedures(procIter); 352 } 353 354 @Override 355 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 356 int corruptedCount = 0; 357 while (procIter.hasNext()) { 358 Procedure<?> proc = procIter.next(); 359 LOG.error("Corrupt " + proc); 360 corruptedCount++; 361 } 362 if (abortOnCorruption && corruptedCount > 0) { 363 throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); 364 } 365 } 366 }); 367 } 368 369 private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { 370 proc.restoreLock(getEnvironment()); 371 restored.add(proc.getProcId()); 372 } 373 374 private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { 375 while (!stack.isEmpty()) { 376 restoreLock(stack.pop(), restored); 377 } 378 } 379 380 // Restore the locks for all the procedures. 381 // Notice that we need to restore the locks starting from the root proc, otherwise there will be 382 // problem that a sub procedure may hold the exclusive lock first and then we are stuck when 383 // calling the acquireLock method for the parent procedure. 384 // The algorithm is straight-forward: 385 // 1. Use a set to record the procedures which locks have already been restored. 386 // 2. Use a stack to store the hierarchy of the procedures 387 // 3. For all the procedure, we will first try to find its parent and push it into the stack, 388 // unless 389 // a. We have no parent, i.e, we are the root procedure 390 // b. The lock has already been restored(by checking the set introduced in #1) 391 // then we start to pop the stack and call acquireLock for each procedure. 392 // Notice that this should be done for all procedures, not only the ones in runnableList. 393 private void restoreLocks() { 394 Set<Long> restored = new HashSet<>(); 395 Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); 396 procedures.values().forEach(proc -> { 397 for (;;) { 398 if (restored.contains(proc.getProcId())) { 399 restoreLocks(stack, restored); 400 return; 401 } 402 if (!proc.hasParent()) { 403 restoreLock(proc, restored); 404 restoreLocks(stack, restored); 405 return; 406 } 407 stack.push(proc); 408 proc = procedures.get(proc.getParentProcId()); 409 } 410 }); 411 } 412 413 private void initializeStacks(ProcedureIterator procIter, 414 List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList, 415 List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> waitingTimeoutList) 416 throws IOException { 417 procIter.reset(); 418 while (procIter.hasNext()) { 419 if (procIter.isNextFinished()) { 420 procIter.skipNext(); 421 continue; 422 } 423 424 @SuppressWarnings("unchecked") 425 Procedure<TEnvironment> proc = procIter.next(); 426 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; 427 LOG.debug("Loading {}", proc); 428 Long rootProcId = getRootProcedureId(proc); 429 // The orphan procedures will be passed to handleCorrupted, so add an assert here 430 assert rootProcId != null; 431 432 if (proc.hasParent()) { 433 Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); 434 if (parent != null && !proc.isFinished()) { 435 parent.incChildrenLatch(); 436 } 437 } 438 439 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 440 procStack.loadStack(proc); 441 442 proc.setRootProcId(rootProcId); 443 switch (proc.getState()) { 444 case RUNNABLE: 445 runnableList.add(proc); 446 break; 447 case WAITING: 448 waitingList.add(proc); 449 break; 450 case WAITING_TIMEOUT: 451 waitingTimeoutList.add(proc); 452 break; 453 case FAILED: 454 failedList.add(proc); 455 break; 456 case ROLLEDBACK: 457 case INITIALIZING: 458 String msg = "Unexpected " + proc.getState() + " state for " + proc; 459 LOG.error(msg); 460 throw new UnsupportedOperationException(msg); 461 default: 462 break; 463 } 464 } 465 rollbackStack.forEach((rootProcId, procStack) -> { 466 if (procStack.getSubproceduresStack() != null) { 467 // if we have already record some stack ids, it means we support rollback 468 procStack.setRollbackSupported(true); 469 } else { 470 // otherwise, test the root procedure to see if we support rollback 471 procStack.setRollbackSupported(procedures.get(rootProcId).isRollbackSupported()); 472 } 473 }); 474 } 475 476 private void processWaitingProcedures(List<Procedure<TEnvironment>> waitingList, 477 List<Procedure<TEnvironment>> runnableList) { 478 waitingList.forEach(proc -> { 479 if (!proc.hasChildren()) { 480 // Normally, WAITING procedures should be waken by its children. But, there is a case that, 481 // all the children are successful and before they can wake up their parent procedure, the 482 // master was killed. So, during recovering the procedures from ProcedureWal, its children 483 // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING 484 // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a 485 // exception will throw: 486 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 487 // "NOT RUNNABLE! " + procedure.toString()); 488 proc.setState(ProcedureState.RUNNABLE); 489 runnableList.add(proc); 490 } else { 491 proc.afterReplay(getEnvironment()); 492 } 493 }); 494 } 495 496 private void processWaitingTimeoutProcedures(List<Procedure<TEnvironment>> waitingTimeoutList) { 497 waitingTimeoutList.forEach(proc -> { 498 proc.afterReplay(getEnvironment()); 499 timeoutExecutor.add(proc); 500 }); 501 } 502 503 private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList, 504 List<Procedure<TEnvironment>> failedList) { 505 failedList.forEach(scheduler::addBack); 506 runnableList.forEach(p -> { 507 p.afterReplay(getEnvironment()); 508 if (!p.hasParent()) { 509 sendProcedureLoadedNotification(p.getProcId()); 510 } 511 scheduler.addBack(p); 512 }); 513 } 514 515 private void loadProcedures(ProcedureIterator procIter) throws IOException { 516 // 1. Build the rollback stack 517 int runnableCount = 0; 518 int failedCount = 0; 519 int waitingCount = 0; 520 int waitingTimeoutCount = 0; 521 while (procIter.hasNext()) { 522 boolean finished = procIter.isNextFinished(); 523 @SuppressWarnings("unchecked") 524 Procedure<TEnvironment> proc = procIter.next(); 525 NonceKey nonceKey = proc.getNonceKey(); 526 long procId = proc.getProcId(); 527 528 if (finished) { 529 completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); 530 LOG.debug("Completed {}", proc); 531 } else { 532 if (!proc.hasParent()) { 533 assert !proc.isFinished() : "unexpected finished procedure"; 534 rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); 535 } 536 537 // add the procedure to the map 538 proc.beforeReplay(getEnvironment()); 539 procedures.put(proc.getProcId(), proc); 540 switch (proc.getState()) { 541 case RUNNABLE: 542 runnableCount++; 543 break; 544 case FAILED: 545 failedCount++; 546 break; 547 case WAITING: 548 waitingCount++; 549 break; 550 case WAITING_TIMEOUT: 551 waitingTimeoutCount++; 552 break; 553 default: 554 break; 555 } 556 } 557 558 if (nonceKey != null) { 559 nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map 560 } 561 } 562 563 // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will 564 // push it into the ProcedureScheduler directly to execute the rollback. But this does not work 565 // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove 566 // the queue from runQueue in scheduler, and then when a procedure which has lock access, for 567 // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler, 568 // we will add the queue back to let the workers poll from it. The assumption here is that, the 569 // procedure which has the xlock should have been polled out already, so when loading we can not 570 // add the procedure to scheduler first and then call acquireLock, since the procedure is still 571 // in the queue, and since we will remove the queue from runQueue, then no one can poll it out, 572 // then there is a dead lock 573 List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); 574 List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); 575 List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); 576 List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); 577 578 initializeStacks(procIter, runnableList, failedList, waitingList, waitingTimeoutList); 579 580 // 3. Check the waiting procedures to see if some of them can be added to runnable. 581 processWaitingProcedures(waitingList, runnableList); 582 583 // 4. restore locks 584 restoreLocks(); 585 586 // 5. Push the procedures to the timeout executor 587 processWaitingTimeoutProcedures(waitingTimeoutList); 588 589 // 6. Push the procedure to the scheduler 590 pushProceduresAfterLoad(runnableList, failedList); 591 // After all procedures put into the queue, signal the worker threads. 592 // Otherwise, there is a race condition. See HBASE-21364. 593 scheduler.signalAll(); 594 } 595 596 /** 597 * Initialize the procedure executor, but do not start workers. We will start them later. 598 * <p/> 599 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 600 * ensure a single executor, and start the procedure replay to resume and recover the previous 601 * pending and in-progress procedures. 602 * @param numThreads number of threads available for procedure execution. 603 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 604 * is found on replay. otherwise false. 605 */ 606 public void init(int numThreads, boolean abortOnCorruption) throws IOException { 607 // We have numThreads executor + one timer thread used for timing out 608 // procedures and triggering periodic procedures. 609 this.corePoolSize = numThreads; 610 this.maxPoolSize = 10 * numThreads; 611 LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", 612 corePoolSize, maxPoolSize); 613 614 this.threadGroup = new ThreadGroup("PEWorkerGroup"); 615 this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); 616 this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); 617 618 // Create the workers 619 workerId.set(0); 620 workerThreads = new CopyOnWriteArrayList<>(); 621 for (int i = 0; i < corePoolSize; ++i) { 622 workerThreads.add(new WorkerThread(threadGroup)); 623 } 624 625 long st, et; 626 627 // Acquire the store lease. 628 st = System.nanoTime(); 629 store.recoverLease(); 630 et = System.nanoTime(); 631 LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), 632 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 633 634 // start the procedure scheduler 635 scheduler.start(); 636 637 // TODO: Split in two steps. 638 // TODO: Handle corrupted procedures (currently just a warn) 639 // The first one will make sure that we have the latest id, 640 // so we can start the threads and accept new procedures. 641 // The second step will do the actual load of old procedures. 642 st = System.nanoTime(); 643 load(abortOnCorruption); 644 et = System.nanoTime(); 645 LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), 646 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 647 } 648 649 /** 650 * Start the workers. 651 */ 652 public void startWorkers() throws IOException { 653 if (!running.compareAndSet(false, true)) { 654 LOG.warn("Already running"); 655 return; 656 } 657 // Start the executors. Here we must have the lastProcId set. 658 LOG.trace("Start workers {}", workerThreads.size()); 659 timeoutExecutor.start(); 660 workerMonitorExecutor.start(); 661 for (WorkerThread worker : workerThreads) { 662 worker.start(); 663 } 664 665 // Internal chores 666 workerMonitorExecutor.add(new WorkerMonitor()); 667 668 // Add completed cleaner chore 669 addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, 670 nonceKeysToProcIdsMap)); 671 } 672 673 public void stop() { 674 if (!running.getAndSet(false)) { 675 return; 676 } 677 678 LOG.info("Stopping"); 679 scheduler.stop(); 680 timeoutExecutor.sendStopSignal(); 681 workerMonitorExecutor.sendStopSignal(); 682 } 683 684 public void join() { 685 assert !isRunning() : "expected not running"; 686 687 // stop the timeout executor 688 timeoutExecutor.awaitTermination(); 689 // stop the work monitor executor 690 workerMonitorExecutor.awaitTermination(); 691 692 // stop the worker threads 693 for (WorkerThread worker : workerThreads) { 694 worker.awaitTermination(); 695 } 696 697 // Destroy the Thread Group for the executors 698 // TODO: Fix. #join is not place to destroy resources. 699 try { 700 threadGroup.destroy(); 701 } catch (IllegalThreadStateException e) { 702 LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, 703 e.getMessage()); 704 // This dumps list of threads on STDOUT. 705 this.threadGroup.list(); 706 } 707 708 // reset the in-memory state for testing 709 completed.clear(); 710 rollbackStack.clear(); 711 procedures.clear(); 712 nonceKeysToProcIdsMap.clear(); 713 scheduler.clear(); 714 lastProcId.set(-1); 715 } 716 717 public void refreshConfiguration(final Configuration conf) { 718 this.conf = conf; 719 setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, DEFAULT_WORKER_KEEP_ALIVE_TIME), 720 TimeUnit.MILLISECONDS); 721 } 722 723 // ========================================================================== 724 // Accessors 725 // ========================================================================== 726 public boolean isRunning() { 727 return running.get(); 728 } 729 730 /** Returns the current number of worker threads. */ 731 public int getWorkerThreadCount() { 732 return workerThreads.size(); 733 } 734 735 /** Returns the core pool size settings. */ 736 public int getCorePoolSize() { 737 return corePoolSize; 738 } 739 740 public int getActiveExecutorCount() { 741 return activeExecutorCount.get(); 742 } 743 744 public TEnvironment getEnvironment() { 745 return this.environment; 746 } 747 748 public ProcedureStore getStore() { 749 return this.store; 750 } 751 752 ProcedureScheduler getScheduler() { 753 return scheduler; 754 } 755 756 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { 757 this.keepAliveTime = timeUnit.toMillis(keepAliveTime); 758 this.scheduler.signalAll(); 759 } 760 761 public long getKeepAliveTime(final TimeUnit timeUnit) { 762 return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); 763 } 764 765 // ========================================================================== 766 // Submit/Remove Chores 767 // ========================================================================== 768 769 /** 770 * Add a chore procedure to the executor 771 * @param chore the chore to add 772 */ 773 public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) { 774 if (chore == null) { 775 return; 776 } 777 chore.setState(ProcedureState.WAITING_TIMEOUT); 778 timeoutExecutor.add(chore); 779 } 780 781 /** 782 * Remove a chore procedure from the executor 783 * @param chore the chore to remove 784 * @return whether the chore is removed, or it will be removed later 785 */ 786 public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) { 787 if (chore == null) { 788 return true; 789 } 790 chore.setState(ProcedureState.SUCCESS); 791 return timeoutExecutor.remove(chore); 792 } 793 794 // ========================================================================== 795 // Nonce Procedure helpers 796 // ========================================================================== 797 /** 798 * Create a NonceKey from the specified nonceGroup and nonce. 799 * @param nonceGroup the group to use for the {@link NonceKey} 800 * @param nonce the nonce to use in the {@link NonceKey} 801 * @return the generated NonceKey 802 */ 803 public NonceKey createNonceKey(final long nonceGroup, final long nonce) { 804 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); 805 } 806 807 /** 808 * Register a nonce for a procedure that is going to be submitted. A procId will be reserved and 809 * on submitProcedure(), the procedure with the specified nonce will take the reserved ProcId. If 810 * someone already reserved the nonce, this method will return the procId reserved, otherwise an 811 * invalid procId will be returned. and the caller should procede and submit the procedure. 812 * @param nonceKey A unique identifier for this operation from the client or process. 813 * @return the procId associated with the nonce, if any otherwise an invalid procId. 814 */ 815 public long registerNonce(final NonceKey nonceKey) { 816 if (nonceKey == null) { 817 return -1; 818 } 819 820 // check if we have already a Reserved ID for the nonce 821 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); 822 if (oldProcId == null) { 823 // reserve a new Procedure ID, this will be associated with the nonce 824 // and the procedure submitted with the specified nonce will use this ID. 825 final long newProcId = nextProcId(); 826 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); 827 if (oldProcId == null) { 828 return -1; 829 } 830 } 831 832 // we found a registered nonce, but the procedure may not have been submitted yet. 833 // since the client expect the procedure to be submitted, spin here until it is. 834 final boolean traceEnabled = LOG.isTraceEnabled(); 835 while ( 836 isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) 837 && nonceKeysToProcIdsMap.containsKey(nonceKey) 838 ) { 839 if (traceEnabled) { 840 LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted"); 841 } 842 Threads.sleep(100); 843 } 844 return oldProcId.longValue(); 845 } 846 847 /** 848 * Remove the NonceKey if the procedure was not submitted to the executor. 849 * @param nonceKey A unique identifier for this operation from the client or process. 850 */ 851 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { 852 if (nonceKey == null) { 853 return; 854 } 855 856 final Long procId = nonceKeysToProcIdsMap.get(nonceKey); 857 if (procId == null) { 858 return; 859 } 860 861 // if the procedure was not submitted, remove the nonce 862 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { 863 nonceKeysToProcIdsMap.remove(nonceKey); 864 } 865 } 866 867 /** 868 * If the failure failed before submitting it, we may want to give back the same error to the 869 * requests with the same nonceKey. 870 * @param nonceKey A unique identifier for this operation from the client or process 871 * @param procName name of the procedure, used to inform the user 872 * @param procOwner name of the owner of the procedure, used to inform the user 873 * @param exception the failure to report to the user 874 */ 875 public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, 876 IOException exception) { 877 if (nonceKey == null) { 878 return; 879 } 880 881 Long procId = nonceKeysToProcIdsMap.get(nonceKey); 882 if (procId == null || completed.containsKey(procId)) { 883 return; 884 } 885 886 completed.computeIfAbsent(procId, (key) -> { 887 Procedure<TEnvironment> proc = 888 new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); 889 890 return new CompletedProcedureRetainer<>(proc); 891 }); 892 } 893 894 // ========================================================================== 895 // Submit/Abort Procedure 896 // ========================================================================== 897 /** 898 * Add a new root-procedure to the executor. 899 * @param proc the new procedure to execute. 900 * @return the procedure id, that can be used to monitor the operation 901 */ 902 public long submitProcedure(Procedure<TEnvironment> proc) { 903 return submitProcedure(proc, null); 904 } 905 906 /** 907 * Bypass a procedure. If the procedure is set to bypass, all the logic in execute/rollback will 908 * be ignored and it will return success, whatever. It is used to recover buggy stuck procedures, 909 * releasing the lock resources and letting other procedures run. Bypassing one procedure (and its 910 * ancestors will be bypassed automatically) may leave the cluster in a middle state, e.g. region 911 * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, the 912 * operators may have to do some clean up on hdfs or schedule some assign procedures to let region 913 * online. DO AT YOUR OWN RISK. 914 * <p> 915 * A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING, 916 * WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is 917 * executing it 3. No child procedure has been submitted 918 * <p> 919 * If all the requirements are meet, the procedure and its ancestors will be bypassed and 920 * persisted to WAL. 921 * <p> 922 * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What 923 * about WAITING_TIMEOUT? 924 * @param pids the procedure id 925 * @param lockWait time to wait lock 926 * @param force if force set to true, we will bypass the procedure even if it is executing. 927 * This is for procedures which can't break out during executing(due to bug, 928 * mostly) In this case, bypassing the procedure is not enough, since it is 929 * already stuck there. We need to restart the master after bypassing, and 930 * letting the problematic procedure to execute wth bypass=true, so in that 931 * condition, the procedure can be successfully bypassed. 932 * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! 933 * @return true if bypass success 934 * @throws IOException IOException 935 */ 936 public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, 937 boolean recursive) throws IOException { 938 List<Boolean> result = new ArrayList<Boolean>(pids.size()); 939 for (long pid : pids) { 940 result.add(bypassProcedure(pid, lockWait, force, recursive)); 941 } 942 return result; 943 } 944 945 boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) 946 throws IOException { 947 Preconditions.checkArgument(lockWait > 0, "lockWait should be positive"); 948 final Procedure<TEnvironment> procedure = getProcedure(pid); 949 if (procedure == null) { 950 LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); 951 return false; 952 } 953 954 LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", procedure, lockWait, 955 override, recursive); 956 IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); 957 if (lockEntry == null && !override) { 958 LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", lockWait, 959 procedure, override); 960 return false; 961 } else if (lockEntry == null) { 962 LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", lockWait, 963 procedure, override); 964 } 965 try { 966 // check whether the procedure is already finished 967 if (procedure.isFinished()) { 968 LOG.debug("{} is already finished, skipping bypass", procedure); 969 return false; 970 } 971 972 if (procedure.hasChildren()) { 973 if (recursive) { 974 // EXPENSIVE. Checks each live procedure of which there could be many!!! 975 // Is there another way to get children of a procedure? 976 LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); 977 this.procedures.forEachValue(1 /* Single-threaded */, 978 // Transformer 979 v -> v.getParentProcId() == procedure.getProcId() ? v : null, 980 // Consumer 981 v -> { 982 try { 983 bypassProcedure(v.getProcId(), lockWait, override, recursive); 984 } catch (IOException e) { 985 LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); 986 } 987 }); 988 } else { 989 LOG.debug("{} has children, skipping bypass", procedure); 990 return false; 991 } 992 } 993 994 // If the procedure has no parent or no child, we are safe to bypass it in whatever state 995 if ( 996 procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE 997 && procedure.getState() != ProcedureState.WAITING 998 && procedure.getState() != ProcedureState.WAITING_TIMEOUT 999 ) { 1000 LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " 1001 + "(with no parent), {}", procedure); 1002 // Question: how is the bypass done here? 1003 return false; 1004 } 1005 1006 // Now, the procedure is not finished, and no one can execute it since we take the lock now 1007 // And we can be sure that its ancestor is not running too, since their child has not 1008 // finished yet 1009 Procedure<TEnvironment> current = procedure; 1010 while (current != null) { 1011 LOG.debug("Bypassing {}", current); 1012 current.bypass(getEnvironment()); 1013 store.update(current); 1014 long parentID = current.getParentProcId(); 1015 current = getProcedure(parentID); 1016 } 1017 1018 // wake up waiting procedure, already checked there is no child 1019 if (procedure.getState() == ProcedureState.WAITING) { 1020 procedure.setState(ProcedureState.RUNNABLE); 1021 store.update(procedure); 1022 } 1023 1024 // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. 1025 // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE 1026 if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1027 LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); 1028 if (timeoutExecutor.remove(procedure)) { 1029 LOG.debug("removed procedure {} from timeoutExecutor", procedure); 1030 timeoutExecutor.executeTimedoutProcedure(procedure); 1031 } 1032 } else if (lockEntry != null) { 1033 scheduler.addFront(procedure); 1034 LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); 1035 } else { 1036 // If we don't have the lock, we can't re-submit the queue, 1037 // since it is already executing. To get rid of the stuck situation, we 1038 // need to restart the master. With the procedure set to bypass, the procedureExecutor 1039 // will bypass it and won't get stuck again. 1040 LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " 1041 + "skipping add to queue", procedure); 1042 } 1043 return true; 1044 1045 } finally { 1046 if (lockEntry != null) { 1047 procExecutionLock.releaseLockEntry(lockEntry); 1048 } 1049 } 1050 } 1051 1052 /** 1053 * Add a new root-procedure to the executor. 1054 * @param proc the new procedure to execute. 1055 * @param nonceKey the registered unique identifier for this operation from the client or process. 1056 * @return the procedure id, that can be used to monitor the operation 1057 */ 1058 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", 1059 justification = "FindBugs is blind to the check-for-null") 1060 public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { 1061 Preconditions.checkArgument(lastProcId.get() >= 0); 1062 1063 prepareProcedure(proc); 1064 1065 final Long currentProcId; 1066 if (nonceKey != null) { 1067 currentProcId = nonceKeysToProcIdsMap.get(nonceKey); 1068 Preconditions.checkArgument(currentProcId != null, 1069 "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); 1070 } else { 1071 currentProcId = nextProcId(); 1072 } 1073 1074 // Initialize the procedure 1075 proc.setNonceKey(nonceKey); 1076 proc.setProcId(currentProcId.longValue()); 1077 1078 // Commit the transaction 1079 store.insert(proc, null); 1080 LOG.debug("Stored {}", proc); 1081 1082 // Add the procedure to the executor 1083 return pushProcedure(proc); 1084 } 1085 1086 /** 1087 * Add a set of new root-procedure to the executor. 1088 * @param procs the new procedures to execute. 1089 */ 1090 // TODO: Do we need to take nonces here? 1091 public void submitProcedures(Procedure<TEnvironment>[] procs) { 1092 Preconditions.checkArgument(lastProcId.get() >= 0); 1093 if (procs == null || procs.length <= 0) { 1094 return; 1095 } 1096 1097 // Prepare procedure 1098 for (int i = 0; i < procs.length; ++i) { 1099 prepareProcedure(procs[i]).setProcId(nextProcId()); 1100 } 1101 1102 // Commit the transaction 1103 store.insert(procs); 1104 if (LOG.isDebugEnabled()) { 1105 LOG.debug("Stored " + Arrays.toString(procs)); 1106 } 1107 1108 // Add the procedure to the executor 1109 for (int i = 0; i < procs.length; ++i) { 1110 pushProcedure(procs[i]); 1111 } 1112 } 1113 1114 private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { 1115 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); 1116 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); 1117 if (this.checkOwnerSet) { 1118 Preconditions.checkArgument(proc.hasOwner(), "missing owner"); 1119 } 1120 return proc; 1121 } 1122 1123 private long pushProcedure(Procedure<TEnvironment> proc) { 1124 final long currentProcId = proc.getProcId(); 1125 1126 // Update metrics on start of a procedure 1127 proc.updateMetricsOnSubmit(getEnvironment()); 1128 1129 // Create the rollback stack for the procedure 1130 RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); 1131 stack.setRollbackSupported(proc.isRollbackSupported()); 1132 rollbackStack.put(currentProcId, stack); 1133 1134 // Submit the new subprocedures 1135 assert !procedures.containsKey(currentProcId); 1136 procedures.put(currentProcId, proc); 1137 sendProcedureAddedNotification(currentProcId); 1138 scheduler.addBack(proc); 1139 return proc.getProcId(); 1140 } 1141 1142 /** 1143 * Send an abort notification the specified procedure. Depending on the procedure implementation 1144 * the abort can be considered or ignored. 1145 * @param procId the procedure to abort 1146 * @return true if the procedure exists and has received the abort, otherwise false. 1147 */ 1148 public boolean abort(long procId) { 1149 return abort(procId, true); 1150 } 1151 1152 /** 1153 * Send an abort notification to the specified procedure. Depending on the procedure 1154 * implementation, the abort can be considered or ignored. 1155 * @param procId the procedure to abort 1156 * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? 1157 * @return true if the procedure exists and has received the abort, otherwise false. 1158 */ 1159 public boolean abort(long procId, boolean mayInterruptIfRunning) { 1160 Procedure<TEnvironment> proc = procedures.get(procId); 1161 if (proc != null) { 1162 if (!mayInterruptIfRunning && proc.wasExecuted()) { 1163 return false; 1164 } 1165 return proc.abort(getEnvironment()); 1166 } 1167 return false; 1168 } 1169 1170 // ========================================================================== 1171 // Executor query helpers 1172 // ========================================================================== 1173 public Procedure<TEnvironment> getProcedure(final long procId) { 1174 return procedures.get(procId); 1175 } 1176 1177 public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { 1178 Procedure<TEnvironment> proc = getProcedure(procId); 1179 if (clazz.isInstance(proc)) { 1180 return clazz.cast(proc); 1181 } 1182 return null; 1183 } 1184 1185 public Procedure<TEnvironment> getResult(long procId) { 1186 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1187 if (retainer == null) { 1188 return null; 1189 } else { 1190 return retainer.getProcedure(); 1191 } 1192 } 1193 1194 /** 1195 * Return true if the procedure is finished. The state may be "completed successfully" or "failed 1196 * and rolledback". Use getResult() to check the state or get the result data. 1197 * @param procId the ID of the procedure to check 1198 * @return true if the procedure execution is finished, otherwise false. 1199 */ 1200 public boolean isFinished(final long procId) { 1201 return !procedures.containsKey(procId); 1202 } 1203 1204 /** 1205 * Return true if the procedure is started. 1206 * @param procId the ID of the procedure to check 1207 * @return true if the procedure execution is started, otherwise false. 1208 */ 1209 public boolean isStarted(long procId) { 1210 Procedure<?> proc = procedures.get(procId); 1211 if (proc == null) { 1212 return completed.get(procId) != null; 1213 } 1214 return proc.wasExecuted(); 1215 } 1216 1217 /** 1218 * Mark the specified completed procedure, as ready to remove. 1219 * @param procId the ID of the procedure to remove 1220 */ 1221 public void removeResult(long procId) { 1222 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1223 if (retainer == null) { 1224 assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; 1225 LOG.debug("pid={} already removed by the cleaner.", procId); 1226 return; 1227 } 1228 1229 // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. 1230 retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); 1231 } 1232 1233 public Procedure<TEnvironment> getResultOrProcedure(long procId) { 1234 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1235 if (retainer == null) { 1236 return procedures.get(procId); 1237 } else { 1238 return retainer.getProcedure(); 1239 } 1240 } 1241 1242 /** 1243 * Check if the user is this procedure's owner 1244 * @param procId the target procedure 1245 * @param user the user 1246 * @return true if the user is the owner of the procedure, false otherwise or the owner is 1247 * unknown. 1248 */ 1249 public boolean isProcedureOwner(long procId, User user) { 1250 if (user == null) { 1251 return false; 1252 } 1253 final Procedure<TEnvironment> runningProc = procedures.get(procId); 1254 if (runningProc != null) { 1255 return runningProc.getOwner().equals(user.getShortName()); 1256 } 1257 1258 final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1259 if (retainer != null) { 1260 return retainer.getProcedure().getOwner().equals(user.getShortName()); 1261 } 1262 1263 // Procedure either does not exist or has already completed and got cleaned up. 1264 // At this time, we cannot check the owner of the procedure 1265 return false; 1266 } 1267 1268 /** 1269 * Should only be used when starting up, where the procedure workers have not been started. 1270 * <p/> 1271 * If the procedure works has been started, the return values maybe changed when you are 1272 * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as 1273 * it will do a copy, and also include the finished procedures. 1274 */ 1275 public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() { 1276 return procedures.values(); 1277 } 1278 1279 /** 1280 * Get procedures. 1281 * @return the procedures in a list 1282 */ 1283 public List<Procedure<TEnvironment>> getProcedures() { 1284 List<Procedure<TEnvironment>> procedureList = 1285 new ArrayList<>(procedures.size() + completed.size()); 1286 procedureList.addAll(procedures.values()); 1287 // Note: The procedure could show up twice in the list with different state, as 1288 // it could complete after we walk through procedures list and insert into 1289 // procedureList - it is ok, as we will use the information in the Procedure 1290 // to figure it out; to prevent this would increase the complexity of the logic. 1291 completed.values().stream().map(CompletedProcedureRetainer::getProcedure) 1292 .forEach(procedureList::add); 1293 return procedureList; 1294 } 1295 1296 // ========================================================================== 1297 // Listeners helpers 1298 // ========================================================================== 1299 public void registerListener(ProcedureExecutorListener listener) { 1300 this.listeners.add(listener); 1301 } 1302 1303 public boolean unregisterListener(ProcedureExecutorListener listener) { 1304 return this.listeners.remove(listener); 1305 } 1306 1307 private void sendProcedureLoadedNotification(final long procId) { 1308 if (!this.listeners.isEmpty()) { 1309 for (ProcedureExecutorListener listener : this.listeners) { 1310 try { 1311 listener.procedureLoaded(procId); 1312 } catch (Throwable e) { 1313 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1314 } 1315 } 1316 } 1317 } 1318 1319 private void sendProcedureAddedNotification(final long procId) { 1320 if (!this.listeners.isEmpty()) { 1321 for (ProcedureExecutorListener listener : this.listeners) { 1322 try { 1323 listener.procedureAdded(procId); 1324 } catch (Throwable e) { 1325 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1326 } 1327 } 1328 } 1329 } 1330 1331 private void sendProcedureFinishedNotification(final long procId) { 1332 if (!this.listeners.isEmpty()) { 1333 for (ProcedureExecutorListener listener : this.listeners) { 1334 try { 1335 listener.procedureFinished(procId); 1336 } catch (Throwable e) { 1337 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1338 } 1339 } 1340 } 1341 } 1342 1343 // ========================================================================== 1344 // Procedure IDs helpers 1345 // ========================================================================== 1346 private long nextProcId() { 1347 long procId = lastProcId.incrementAndGet(); 1348 if (procId < 0) { 1349 while (!lastProcId.compareAndSet(procId, 0)) { 1350 procId = lastProcId.get(); 1351 if (procId >= 0) { 1352 break; 1353 } 1354 } 1355 while (procedures.containsKey(procId)) { 1356 procId = lastProcId.incrementAndGet(); 1357 } 1358 } 1359 assert procId >= 0 : "Invalid procId " + procId; 1360 return procId; 1361 } 1362 1363 protected long getLastProcId() { 1364 return lastProcId.get(); 1365 } 1366 1367 public Set<Long> getActiveProcIds() { 1368 return procedures.keySet(); 1369 } 1370 1371 Long getRootProcedureId(Procedure<TEnvironment> proc) { 1372 return Procedure.getRootProcedureId(procedures, proc); 1373 } 1374 1375 // ========================================================================== 1376 // Executions 1377 // ========================================================================== 1378 private void executeProcedure(Procedure<TEnvironment> proc) { 1379 if (proc.isFinished()) { 1380 LOG.debug("{} is already finished, skipping execution", proc); 1381 return; 1382 } 1383 final Long rootProcId = getRootProcedureId(proc); 1384 if (rootProcId == null) { 1385 // The 'proc' was ready to run but the root procedure was rolledback 1386 LOG.warn("Rollback because parent is done/rolledback proc=" + proc); 1387 executeRollback(proc); 1388 return; 1389 } 1390 1391 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 1392 if (procStack == null) { 1393 LOG.warn("RootProcedureState is null for " + proc.getProcId()); 1394 return; 1395 } 1396 do { 1397 // Try to acquire the execution 1398 if (!procStack.acquire(proc)) { 1399 if (procStack.setRollback()) { 1400 // we have the 'rollback-lock' we can start rollingback 1401 switch (executeRollback(rootProcId, procStack)) { 1402 case LOCK_ACQUIRED: 1403 break; 1404 case LOCK_YIELD_WAIT: 1405 procStack.unsetRollback(); 1406 scheduler.yield(proc); 1407 break; 1408 case LOCK_EVENT_WAIT: 1409 LOG.info("LOCK_EVENT_WAIT rollback..." + proc); 1410 procStack.unsetRollback(); 1411 break; 1412 default: 1413 throw new UnsupportedOperationException(); 1414 } 1415 } else { 1416 // if we can't rollback means that some child is still running. 1417 // the rollback will be executed after all the children are done. 1418 // If the procedure was never executed, remove and mark it as rolledback. 1419 if (!proc.wasExecuted()) { 1420 switch (executeRollback(proc)) { 1421 case LOCK_ACQUIRED: 1422 break; 1423 case LOCK_YIELD_WAIT: 1424 scheduler.yield(proc); 1425 break; 1426 case LOCK_EVENT_WAIT: 1427 LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); 1428 break; 1429 default: 1430 throw new UnsupportedOperationException(); 1431 } 1432 } 1433 } 1434 break; 1435 } 1436 1437 // Execute the procedure 1438 assert proc.getState() == ProcedureState.RUNNABLE : proc; 1439 // Note that lock is NOT about concurrency but rather about ensuring 1440 // ownership of a procedure of an entity such as a region or table 1441 LockState lockState = acquireLock(proc); 1442 switch (lockState) { 1443 case LOCK_ACQUIRED: 1444 execProcedure(procStack, proc); 1445 break; 1446 case LOCK_YIELD_WAIT: 1447 LOG.info(lockState + " " + proc); 1448 scheduler.yield(proc); 1449 break; 1450 case LOCK_EVENT_WAIT: 1451 // Someone will wake us up when the lock is available 1452 LOG.debug(lockState + " " + proc); 1453 break; 1454 default: 1455 throw new UnsupportedOperationException(); 1456 } 1457 procStack.release(proc); 1458 1459 if (proc.isSuccess()) { 1460 // update metrics on finishing the procedure 1461 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); 1462 LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); 1463 // Finalize the procedure state 1464 if (proc.getProcId() == rootProcId) { 1465 procedureFinished(proc); 1466 } else { 1467 execCompletionCleanup(proc); 1468 } 1469 break; 1470 } 1471 } while (procStack.isFailed()); 1472 } 1473 1474 private LockState acquireLock(Procedure<TEnvironment> proc) { 1475 TEnvironment env = getEnvironment(); 1476 // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if 1477 // hasLock is true. 1478 if (proc.hasLock()) { 1479 return LockState.LOCK_ACQUIRED; 1480 } 1481 return proc.doAcquireLock(env, store); 1482 } 1483 1484 private void releaseLock(Procedure<TEnvironment> proc, boolean force) { 1485 TEnvironment env = getEnvironment(); 1486 // For how the framework works, we know that we will always have the lock 1487 // when we call releaseLock(), so we can avoid calling proc.hasLock() 1488 if (force || !proc.holdLock(env) || proc.isFinished()) { 1489 proc.doReleaseLock(env, store); 1490 } 1491 } 1492 1493 // Returning null means we have already held the execution lock, so you do not need to get the 1494 // lock entry for releasing 1495 private IdLock.Entry getLockEntryForRollback(long procId) { 1496 // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need 1497 // this check, as the worker will hold the lock before executing a procedure. This is the only 1498 // place where we may hold two procedure execution locks, and there is a fence in the 1499 // RootProcedureState where we can make sure that only one worker can execute the rollback of 1500 // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to 1501 // prevent race between us and the force update thread. 1502 if (!procExecutionLock.isHeldByCurrentThread(procId)) { 1503 try { 1504 return procExecutionLock.getLockEntry(procId); 1505 } catch (IOException e) { 1506 // can only happen if interrupted, so not a big deal to propagate it 1507 throw new UncheckedIOException(e); 1508 } 1509 } 1510 return null; 1511 } 1512 1513 private void executeUnexpectedRollback(Procedure<TEnvironment> rootProc, 1514 RootProcedureState<TEnvironment> procStack) { 1515 if (procStack.getSubprocs() != null) { 1516 // comparing proc id in reverse order, so we will delete later procedures first, otherwise we 1517 // may delete parent procedure first and if we fail in the middle of this operation, when 1518 // loading we will find some orphan procedures 1519 PriorityQueue<Procedure<TEnvironment>> pq = 1520 new PriorityQueue<>(procStack.getSubprocs().size(), 1521 Comparator.<Procedure<TEnvironment>> comparingLong(Procedure::getProcId).reversed()); 1522 pq.addAll(procStack.getSubprocs()); 1523 for (;;) { 1524 Procedure<TEnvironment> subproc = pq.poll(); 1525 if (subproc == null) { 1526 break; 1527 } 1528 if (!procedures.containsKey(subproc.getProcId())) { 1529 // this means it has already been rolledback 1530 continue; 1531 } 1532 IdLock.Entry lockEntry = getLockEntryForRollback(subproc.getProcId()); 1533 try { 1534 cleanupAfterRollbackOneStep(subproc); 1535 execCompletionCleanup(subproc); 1536 } finally { 1537 if (lockEntry != null) { 1538 procExecutionLock.releaseLockEntry(lockEntry); 1539 } 1540 } 1541 } 1542 } 1543 IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId()); 1544 try { 1545 cleanupAfterRollbackOneStep(rootProc); 1546 } finally { 1547 if (lockEntry != null) { 1548 procExecutionLock.releaseLockEntry(lockEntry); 1549 } 1550 } 1551 } 1552 1553 private LockState executeNormalRollback(Procedure<TEnvironment> rootProc, 1554 RootProcedureState<TEnvironment> procStack) { 1555 List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); 1556 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; 1557 1558 int stackTail = subprocStack.size(); 1559 while (stackTail-- > 0) { 1560 Procedure<TEnvironment> proc = subprocStack.get(stackTail); 1561 IdLock.Entry lockEntry = getLockEntryForRollback(proc.getProcId()); 1562 try { 1563 // For the sub procedures which are successfully finished, we do not rollback them. 1564 // Typically, if we want to rollback a procedure, we first need to rollback it, and then 1565 // recursively rollback its ancestors. The state changes which are done by sub procedures 1566 // should be handled by parent procedures when rolling back. For example, when rolling back 1567 // a MergeTableProcedure, we will schedule new procedures to bring the offline regions 1568 // online, instead of rolling back the original procedures which offlined the regions(in 1569 // fact these procedures can not be rolled back...). 1570 if (proc.isSuccess()) { 1571 // Just do the cleanup work, without actually executing the rollback 1572 subprocStack.remove(stackTail); 1573 cleanupAfterRollbackOneStep(proc); 1574 continue; 1575 } 1576 LockState lockState = acquireLock(proc); 1577 if (lockState != LockState.LOCK_ACQUIRED) { 1578 // can't take a lock on the procedure, add the root-proc back on the 1579 // queue waiting for the lock availability 1580 return lockState; 1581 } 1582 1583 lockState = executeRollback(proc); 1584 releaseLock(proc, false); 1585 boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; 1586 abortRollback |= !isRunning() || !store.isRunning(); 1587 1588 // allows to kill the executor before something is stored to the wal. 1589 // useful to test the procedure recovery. 1590 if (abortRollback) { 1591 return lockState; 1592 } 1593 1594 subprocStack.remove(stackTail); 1595 1596 // if the procedure is kind enough to pass the slot to someone else, yield 1597 // if the proc is already finished, do not yield 1598 if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { 1599 return LockState.LOCK_YIELD_WAIT; 1600 } 1601 1602 if (proc != rootProc) { 1603 execCompletionCleanup(proc); 1604 } 1605 } finally { 1606 if (lockEntry != null) { 1607 procExecutionLock.releaseLockEntry(lockEntry); 1608 } 1609 } 1610 } 1611 return LockState.LOCK_ACQUIRED; 1612 } 1613 1614 /** 1615 * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the 1616 * root-procedure will be visible as finished to user, and the result will be the fatal exception. 1617 */ 1618 private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { 1619 Procedure<TEnvironment> rootProc = procedures.get(rootProcId); 1620 RemoteProcedureException exception = rootProc.getException(); 1621 // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are 1622 // rolling back because the subprocedure does. Clarify. 1623 if (exception == null) { 1624 exception = procStack.getException(); 1625 rootProc.setFailure(exception); 1626 store.update(rootProc); 1627 } 1628 1629 if (procStack.isRollbackSupported()) { 1630 LockState lockState = executeNormalRollback(rootProc, procStack); 1631 if (lockState != LockState.LOCK_ACQUIRED) { 1632 return lockState; 1633 } 1634 } else { 1635 // the procedure does not support rollback, so typically we should not reach here, this 1636 // usually means there are code bugs, let's just wait all the subprocedures to finish and then 1637 // mark the root procedure as failure. 1638 LOG.error(HBaseMarkers.FATAL, 1639 "Root Procedure {} does not support rollback but the execution failed" 1640 + " and try to rollback, code bug?", 1641 rootProc, exception); 1642 executeUnexpectedRollback(rootProc, procStack); 1643 } 1644 1645 IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId()); 1646 try { 1647 // Finalize the procedure state 1648 LOG.info("Rolled back {} exec-time={}", rootProc, 1649 StringUtils.humanTimeDiff(rootProc.elapsedTime())); 1650 procedureFinished(rootProc); 1651 } finally { 1652 if (lockEntry != null) { 1653 procExecutionLock.releaseLockEntry(lockEntry); 1654 } 1655 } 1656 1657 return LockState.LOCK_ACQUIRED; 1658 } 1659 1660 private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { 1661 if (testing != null && testing.shouldKillBeforeStoreUpdateInRollback()) { 1662 kill("TESTING: Kill BEFORE store update in rollback: " + proc); 1663 } 1664 if (proc.removeStackIndex()) { 1665 if (!proc.isSuccess()) { 1666 proc.setState(ProcedureState.ROLLEDBACK); 1667 } 1668 1669 // update metrics on finishing the procedure (fail) 1670 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); 1671 1672 if (proc.hasParent()) { 1673 store.delete(proc.getProcId()); 1674 procedures.remove(proc.getProcId()); 1675 } else { 1676 final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); 1677 if (childProcIds != null) { 1678 store.delete(proc, childProcIds); 1679 } else { 1680 store.update(proc); 1681 } 1682 } 1683 } else { 1684 store.update(proc); 1685 } 1686 } 1687 1688 /** 1689 * Execute the rollback of the procedure step. It updates the store with the new state (stack 1690 * index) or will remove completly the procedure in case it is a child. 1691 */ 1692 private LockState executeRollback(Procedure<TEnvironment> proc) { 1693 try { 1694 proc.doRollback(getEnvironment()); 1695 } catch (IOException e) { 1696 LOG.debug("Roll back attempt failed for {}", proc, e); 1697 return LockState.LOCK_YIELD_WAIT; 1698 } catch (InterruptedException e) { 1699 handleInterruptedException(proc, e); 1700 return LockState.LOCK_YIELD_WAIT; 1701 } catch (Throwable e) { 1702 // Catch NullPointerExceptions or similar errors... 1703 LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); 1704 } 1705 1706 cleanupAfterRollbackOneStep(proc); 1707 1708 return LockState.LOCK_ACQUIRED; 1709 } 1710 1711 private void yieldProcedure(Procedure<TEnvironment> proc) { 1712 releaseLock(proc, false); 1713 scheduler.yield(proc); 1714 } 1715 1716 /** 1717 * Executes <code>procedure</code> 1718 * <ul> 1719 * <li>Calls the doExecute() of the procedure 1720 * <li>If the procedure execution didn't fail (i.e. valid user input) 1721 * <ul> 1722 * <li>...and returned subprocedures 1723 * <ul> 1724 * <li>The subprocedures are initialized. 1725 * <li>The subprocedures are added to the store 1726 * <li>The subprocedures are added to the runnable queue 1727 * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete 1728 * </ul> 1729 * </li> 1730 * <li>...if there are no subprocedure 1731 * <ul> 1732 * <li>the procedure completed successfully 1733 * <li>if there is a parent (WAITING) 1734 * <li>the parent state will be set to RUNNABLE 1735 * </ul> 1736 * </li> 1737 * </ul> 1738 * </li> 1739 * <li>In case of failure 1740 * <ul> 1741 * <li>The store is updated with the new state</li> 1742 * <li>The executor (caller of this method) will start the rollback of the procedure</li> 1743 * </ul> 1744 * </li> 1745 * </ul> 1746 */ 1747 private void execProcedure(RootProcedureState<TEnvironment> procStack, 1748 Procedure<TEnvironment> procedure) { 1749 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 1750 "NOT RUNNABLE! " + procedure.toString()); 1751 1752 // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. 1753 // The exception is caught below and then we hurry to the exit without disturbing state. The 1754 // idea is that the processing of this procedure will be unsuspended later by an external event 1755 // such the report of a region open. 1756 boolean suspended = false; 1757 1758 // Whether to 're-' -execute; run through the loop again. 1759 boolean reExecute = false; 1760 1761 Procedure<TEnvironment>[] subprocs = null; 1762 do { 1763 reExecute = false; 1764 procedure.resetPersistence(); 1765 try { 1766 subprocs = procedure.doExecute(getEnvironment()); 1767 if (subprocs != null && subprocs.length == 0) { 1768 subprocs = null; 1769 } 1770 } catch (ProcedureSuspendedException e) { 1771 LOG.trace("Suspend {}", procedure); 1772 suspended = true; 1773 } catch (ProcedureYieldException e) { 1774 LOG.trace("Yield {}", procedure, e); 1775 yieldProcedure(procedure); 1776 return; 1777 } catch (InterruptedException e) { 1778 LOG.trace("Yield interrupt {}", procedure, e); 1779 handleInterruptedException(procedure, e); 1780 yieldProcedure(procedure); 1781 return; 1782 } catch (Throwable e) { 1783 // Catch NullPointerExceptions or similar errors... 1784 String msg = "CODE-BUG: Uncaught runtime exception: " + procedure; 1785 LOG.error(msg, e); 1786 procedure.setFailure(new RemoteProcedureException(msg, e)); 1787 } 1788 1789 if (!procedure.isFailed()) { 1790 if (subprocs != null) { 1791 if (subprocs.length == 1 && subprocs[0] == procedure) { 1792 // Procedure returned itself. Quick-shortcut for a state machine-like procedure; 1793 // i.e. we go around this loop again rather than go back out on the scheduler queue. 1794 subprocs = null; 1795 reExecute = true; 1796 LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); 1797 } else { 1798 // Yield the current procedure, and make the subprocedure runnable 1799 // subprocs may come back 'null'. 1800 subprocs = initializeChildren(procStack, procedure, subprocs); 1801 LOG.info("Initialized subprocedures=" + (subprocs == null 1802 ? null 1803 : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList()) 1804 .toString())); 1805 } 1806 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1807 LOG.trace("Added to timeoutExecutor {}", procedure); 1808 timeoutExecutor.add(procedure); 1809 } else if (!suspended) { 1810 // No subtask, so we are done 1811 procedure.setState(ProcedureState.SUCCESS); 1812 } 1813 } 1814 1815 // allows to kill the executor before something is stored to the wal. 1816 // useful to test the procedure recovery. 1817 if ( 1818 testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent()) 1819 ) { 1820 kill("TESTING: Kill BEFORE store update: " + procedure); 1821 } 1822 1823 // TODO: The code here doesn't check if store is running before persisting to the store as 1824 // it relies on the method call below to throw RuntimeException to wind up the stack and 1825 // executor thread to stop. The statement following the method call below seems to check if 1826 // store is not running, to prevent scheduling children procedures, re-execution or yield 1827 // of this procedure. This may need more scrutiny and subsequent cleanup in future 1828 // 1829 // Commit the transaction even if a suspend (state may have changed). Note this append 1830 // can take a bunch of time to complete. 1831 if (procedure.needPersistence()) { 1832 // Add the procedure to the stack 1833 // See HBASE-28210 on why we need synchronized here 1834 boolean needUpdateStoreOutsideLock = false; 1835 synchronized (procStack) { 1836 if (procStack.addRollbackStep(procedure)) { 1837 updateStoreOnExec(procStack, procedure, subprocs); 1838 } else { 1839 needUpdateStoreOutsideLock = true; 1840 } 1841 } 1842 // this is an optimization if we do not need to maintain rollback step, as all subprocedures 1843 // of the same root procedure share the same root procedure state, if we can only update 1844 // store under the above lock, the sub procedures of the same root procedure can only be 1845 // persistent sequentially, which will have a bad performance. See HBASE-28212 for more 1846 // details. 1847 if (needUpdateStoreOutsideLock) { 1848 updateStoreOnExec(procStack, procedure, subprocs); 1849 } 1850 } 1851 1852 // if the store is not running we are aborting 1853 if (!store.isRunning()) { 1854 return; 1855 } 1856 // if the procedure is kind enough to pass the slot to someone else, yield 1857 if ( 1858 procedure.isRunnable() && !suspended 1859 && procedure.isYieldAfterExecutionStep(getEnvironment()) 1860 ) { 1861 yieldProcedure(procedure); 1862 return; 1863 } 1864 1865 assert (reExecute && subprocs == null) || !reExecute; 1866 } while (reExecute); 1867 1868 // Allows to kill the executor after something is stored to the WAL but before the below 1869 // state settings are done -- in particular the one on the end where we make parent 1870 // RUNNABLE again when its children are done; see countDownChildren. 1871 if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) { 1872 kill("TESTING: Kill AFTER store update: " + procedure); 1873 } 1874 1875 // Submit the new subprocedures 1876 if (subprocs != null && !procedure.isFailed()) { 1877 submitChildrenProcedures(subprocs); 1878 } 1879 1880 // we need to log the release lock operation before waking up the parent procedure, as there 1881 // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all 1882 // the sub procedures from store and cause problems... 1883 releaseLock(procedure, false); 1884 1885 // if the procedure is complete and has a parent, count down the children latch. 1886 // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. 1887 if (!suspended && procedure.isFinished() && procedure.hasParent()) { 1888 countDownChildren(procStack, procedure); 1889 } 1890 } 1891 1892 private void kill(String msg) { 1893 LOG.debug(msg); 1894 stop(); 1895 throw new RuntimeException(msg); 1896 } 1897 1898 private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, 1899 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1900 assert subprocs != null : "expected subprocedures"; 1901 final long rootProcId = getRootProcedureId(procedure); 1902 for (int i = 0; i < subprocs.length; ++i) { 1903 Procedure<TEnvironment> subproc = subprocs[i]; 1904 if (subproc == null) { 1905 String msg = "subproc[" + i + "] is null, aborting the procedure"; 1906 procedure 1907 .setFailure(new RemoteProcedureException(msg, new IllegalArgumentIOException(msg))); 1908 return null; 1909 } 1910 1911 assert subproc.getState() == ProcedureState.INITIALIZING : subproc; 1912 subproc.setParentProcId(procedure.getProcId()); 1913 subproc.setRootProcId(rootProcId); 1914 subproc.setProcId(nextProcId()); 1915 procStack.addSubProcedure(subproc); 1916 } 1917 1918 if (!procedure.isFailed()) { 1919 procedure.setChildrenLatch(subprocs.length); 1920 switch (procedure.getState()) { 1921 case RUNNABLE: 1922 procedure.setState(ProcedureState.WAITING); 1923 break; 1924 case WAITING_TIMEOUT: 1925 timeoutExecutor.add(procedure); 1926 break; 1927 default: 1928 break; 1929 } 1930 } 1931 return subprocs; 1932 } 1933 1934 private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { 1935 for (int i = 0; i < subprocs.length; ++i) { 1936 Procedure<TEnvironment> subproc = subprocs[i]; 1937 subproc.updateMetricsOnSubmit(getEnvironment()); 1938 assert !procedures.containsKey(subproc.getProcId()); 1939 procedures.put(subproc.getProcId(), subproc); 1940 scheduler.addFront(subproc); 1941 } 1942 } 1943 1944 private void countDownChildren(RootProcedureState<TEnvironment> procStack, 1945 Procedure<TEnvironment> procedure) { 1946 Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); 1947 if (parent == null) { 1948 assert procStack.isRollingback(); 1949 return; 1950 } 1951 1952 // If this procedure is the last child awake the parent procedure 1953 if (parent.tryRunnable()) { 1954 // If we succeeded in making the parent runnable -- i.e. all of its 1955 // children have completed, move parent to front of the queue. 1956 store.update(parent); 1957 scheduler.addFront(parent); 1958 LOG.info("Finished subprocedure pid={}, resume processing ppid={}", procedure.getProcId(), 1959 parent.getProcId()); 1960 return; 1961 } 1962 } 1963 1964 private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, 1965 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1966 if (subprocs != null && !procedure.isFailed()) { 1967 if (LOG.isTraceEnabled()) { 1968 LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); 1969 } 1970 store.insert(procedure, subprocs); 1971 } else { 1972 LOG.trace("Store update {}", procedure); 1973 if (procedure.isFinished() && !procedure.hasParent()) { 1974 // remove child procedures 1975 final long[] childProcIds = procStack.getSubprocedureIds(); 1976 if (childProcIds != null) { 1977 store.delete(procedure, childProcIds); 1978 for (int i = 0; i < childProcIds.length; ++i) { 1979 procedures.remove(childProcIds[i]); 1980 } 1981 } else { 1982 store.update(procedure); 1983 } 1984 } else { 1985 store.update(procedure); 1986 } 1987 } 1988 } 1989 1990 private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { 1991 LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); 1992 // NOTE: We don't call Thread.currentThread().interrupt() 1993 // because otherwise all the subsequent calls e.g. Thread.sleep() will throw 1994 // the InterruptedException. If the master is going down, we will be notified 1995 // and the executor/store will be stopped. 1996 // (The interrupted procedure will be retried on the next run) 1997 } 1998 1999 private void execCompletionCleanup(Procedure<TEnvironment> proc) { 2000 final TEnvironment env = getEnvironment(); 2001 if (proc.hasLock()) { 2002 LOG.warn("Usually this should not happen, we will release the lock before if the procedure" 2003 + " is finished, even if the holdLock is true, arrive here means we have some holes where" 2004 + " we do not release the lock. And the releaseLock below may fail since the procedure may" 2005 + " have already been deleted from the procedure store."); 2006 releaseLock(proc, true); 2007 } 2008 try { 2009 proc.completionCleanup(env); 2010 } catch (Throwable e) { 2011 // Catch NullPointerExceptions or similar errors... 2012 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); 2013 } 2014 } 2015 2016 private void procedureFinished(Procedure<TEnvironment> proc) { 2017 // call the procedure completion cleanup handler 2018 execCompletionCleanup(proc); 2019 2020 CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); 2021 2022 // update the executor internal state maps 2023 if (!proc.shouldWaitClientAck(getEnvironment())) { 2024 retainer.setClientAckTime(0); 2025 } 2026 2027 completed.put(proc.getProcId(), retainer); 2028 rollbackStack.remove(proc.getProcId()); 2029 procedures.remove(proc.getProcId()); 2030 2031 // call the runnableSet completion cleanup handler 2032 try { 2033 scheduler.completionCleanup(proc); 2034 } catch (Throwable e) { 2035 // Catch NullPointerExceptions or similar errors... 2036 LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); 2037 } 2038 2039 // Notify the listeners 2040 sendProcedureFinishedNotification(proc.getProcId()); 2041 } 2042 2043 RootProcedureState<TEnvironment> getProcStack(long rootProcId) { 2044 return rollbackStack.get(rootProcId); 2045 } 2046 2047 ProcedureScheduler getProcedureScheduler() { 2048 return scheduler; 2049 } 2050 2051 int getCompletedSize() { 2052 return completed.size(); 2053 } 2054 2055 public IdLock getProcExecutionLock() { 2056 return procExecutionLock; 2057 } 2058 2059 // ========================================================================== 2060 // Worker Thread 2061 // ========================================================================== 2062 private class WorkerThread extends StoppableThread { 2063 private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); 2064 private volatile Procedure<TEnvironment> activeProcedure; 2065 2066 public WorkerThread(ThreadGroup group) { 2067 this(group, "PEWorker-"); 2068 } 2069 2070 protected WorkerThread(ThreadGroup group, String prefix) { 2071 super(group, prefix + workerId.incrementAndGet()); 2072 setDaemon(true); 2073 } 2074 2075 @Override 2076 public void sendStopSignal() { 2077 scheduler.signalAll(); 2078 } 2079 2080 /** 2081 * Encapsulates execution of the current {@link #activeProcedure} for easy tracing. 2082 */ 2083 private long runProcedure() throws IOException { 2084 final Procedure<TEnvironment> proc = this.activeProcedure; 2085 int activeCount = activeExecutorCount.incrementAndGet(); 2086 int runningCount = store.setRunningProcedureCount(activeCount); 2087 LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, 2088 activeCount); 2089 executionStartTime.set(EnvironmentEdgeManager.currentTime()); 2090 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 2091 try { 2092 executeProcedure(proc); 2093 } catch (AssertionError e) { 2094 LOG.info("ASSERT pid=" + proc.getProcId(), e); 2095 throw e; 2096 } finally { 2097 procExecutionLock.releaseLockEntry(lockEntry); 2098 activeCount = activeExecutorCount.decrementAndGet(); 2099 runningCount = store.setRunningProcedureCount(activeCount); 2100 LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, 2101 activeCount); 2102 this.activeProcedure = null; 2103 executionStartTime.set(Long.MAX_VALUE); 2104 } 2105 return EnvironmentEdgeManager.currentTime(); 2106 } 2107 2108 @Override 2109 public void run() { 2110 long lastUpdate = EnvironmentEdgeManager.currentTime(); 2111 try { 2112 while (isRunning() && keepAlive(lastUpdate)) { 2113 @SuppressWarnings("unchecked") 2114 Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); 2115 if (proc == null) { 2116 continue; 2117 } 2118 this.activeProcedure = proc; 2119 lastUpdate = TraceUtil.trace(this::runProcedure, new ProcedureSpanBuilder(proc)); 2120 } 2121 } catch (Throwable t) { 2122 LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); 2123 } finally { 2124 LOG.trace("Worker terminated."); 2125 } 2126 workerThreads.remove(this); 2127 } 2128 2129 @Override 2130 public String toString() { 2131 Procedure<?> p = this.activeProcedure; 2132 return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")"); 2133 } 2134 2135 /** Returns the time since the current procedure is running */ 2136 public long getCurrentRunTime() { 2137 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); 2138 } 2139 2140 // core worker never timeout 2141 protected boolean keepAlive(long lastUpdate) { 2142 return true; 2143 } 2144 } 2145 2146 // A worker thread which can be added when core workers are stuck. Will timeout after 2147 // keepAliveTime if there is no procedure to run. 2148 private final class KeepAliveWorkerThread extends WorkerThread { 2149 public KeepAliveWorkerThread(ThreadGroup group) { 2150 super(group, "KeepAlivePEWorker-"); 2151 } 2152 2153 @Override 2154 protected boolean keepAlive(long lastUpdate) { 2155 return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; 2156 } 2157 } 2158 2159 // ---------------------------------------------------------------------------- 2160 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the 2161 // full set of procedures pending and completed to write a compacted 2162 // version of the log (in case is a log)? 2163 // In theory no, procedures are have a short life, so at some point the store 2164 // will have the tracker saying everything is in the last log. 2165 // ---------------------------------------------------------------------------- 2166 2167 private final class WorkerMonitor extends InlineChore { 2168 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = 2169 "hbase.procedure.worker.monitor.interval.msec"; 2170 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec 2171 2172 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = 2173 "hbase.procedure.worker.stuck.threshold.msec"; 2174 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec 2175 2176 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = 2177 "hbase.procedure.worker.add.stuck.percentage"; 2178 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck 2179 2180 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; 2181 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; 2182 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; 2183 2184 public WorkerMonitor() { 2185 refreshConfig(); 2186 } 2187 2188 @Override 2189 public void run() { 2190 final int stuckCount = checkForStuckWorkers(); 2191 checkThreadCount(stuckCount); 2192 2193 // refresh interval (poor man dynamic conf update) 2194 refreshConfig(); 2195 } 2196 2197 private int checkForStuckWorkers() { 2198 // check if any of the worker is stuck 2199 int stuckCount = 0; 2200 for (WorkerThread worker : workerThreads) { 2201 if (worker.getCurrentRunTime() < stuckThreshold) { 2202 continue; 2203 } 2204 2205 // WARN the worker is stuck 2206 stuckCount++; 2207 LOG.warn("Worker stuck {}, run time {}", worker, 2208 StringUtils.humanTimeDiff(worker.getCurrentRunTime())); 2209 } 2210 return stuckCount; 2211 } 2212 2213 private void checkThreadCount(final int stuckCount) { 2214 // nothing to do if there are no runnable tasks 2215 if (stuckCount < 1 || !scheduler.hasRunnables()) { 2216 return; 2217 } 2218 2219 // add a new thread if the worker stuck percentage exceed the threshold limit 2220 // and every handler is active. 2221 final float stuckPerc = ((float) stuckCount) / workerThreads.size(); 2222 // let's add new worker thread more aggressively, as they will timeout finally if there is no 2223 // work to do. 2224 if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { 2225 final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); 2226 workerThreads.add(worker); 2227 worker.start(); 2228 LOG.debug("Added new worker thread {}", worker); 2229 } 2230 } 2231 2232 private void refreshConfig() { 2233 addWorkerStuckPercentage = 2234 conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); 2235 timeoutInterval = 2236 conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL); 2237 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, DEFAULT_WORKER_STUCK_THRESHOLD); 2238 } 2239 2240 @Override 2241 public int getTimeoutInterval() { 2242 return timeoutInterval; 2243 } 2244 } 2245}