001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.SocketTimeoutException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.Date; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.RejectedExecutionException; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.HBaseServerException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.RegionLocations; 042import org.apache.hadoop.hbase.RetryImmediatelyException; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 046import org.apache.hadoop.hbase.client.coprocessor.Batch; 047import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 048import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * The context, and return value, for a single submit/submitAll call. Note on how this class (one AP 057 * submit) works. Initially, all requests are split into groups by server; request is sent to each 058 * server in parallel; the RPC calls are not async so a thread per server is used. Every time some 059 * actions fail, regions/locations might have changed, so we re-group them by server and region 060 * again and send these groups in parallel too. The result, in case of retries, is a "tree" of 061 * threads, with parent exiting after scheduling children. This is why lots of code doesn't require 062 * any synchronization. 063 */ 064@InterfaceAudience.Private 065class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { 066 067 private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class); 068 069 private RetryingTimeTracker tracker; 070 071 /** 072 * Runnable (that can be submitted to thread pool) that waits for when it's time to issue replica 073 * calls, finds region replicas, groups the requests by replica and issues the calls (on separate 074 * threads, via sendMultiAction). This is done on a separate thread because we don't want to wait 075 * on user thread for our asynchronous call, and usually we have to wait before making replica 076 * calls. 077 */ 078 private final class ReplicaCallIssuingRunnable implements Runnable { 079 private final long startTime; 080 private final List<Action> initialActions; 081 082 public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) { 083 this.initialActions = initialActions; 084 this.startTime = startTime; 085 } 086 087 @Override 088 public void run() { 089 boolean done = false; 090 if (asyncProcess.primaryCallTimeoutMicroseconds > 0) { 091 try { 092 done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds); 093 } catch (InterruptedException ex) { 094 LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage()); 095 return; 096 } 097 } 098 if (done) return; // Done within primary timeout 099 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 100 List<Action> unknownLocActions = new ArrayList<>(); 101 if (replicaGetIndices == null) { 102 for (int i = 0; i < results.length; ++i) { 103 addReplicaActions(i, actionsByServer, unknownLocActions); 104 } 105 } else { 106 for (int replicaGetIndice : replicaGetIndices) { 107 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); 108 } 109 } 110 if (!actionsByServer.isEmpty()) { 111 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); 112 } 113 if (!unknownLocActions.isEmpty()) { 114 actionsByServer = new HashMap<>(); 115 for (Action action : unknownLocActions) { 116 addReplicaActionsAgain(action, actionsByServer); 117 } 118 // Some actions may have completely failed, they are handled inside addAgain. 119 if (!actionsByServer.isEmpty()) { 120 sendMultiAction(actionsByServer, 1, null, true); 121 } 122 } 123 } 124 125 /** 126 * Add replica actions to action map by server. 127 * @param index Index of the original action. 128 * @param actionsByServer The map by server to add it to. 129 */ 130 private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer, 131 List<Action> unknownReplicaActions) { 132 if (results[index] != null) return; // opportunistic. Never goes from non-null to null. 133 Action action = initialActions.get(index); 134 RegionLocations loc = findAllLocationsOrFail(action, true); 135 if (loc == null) return; 136 HRegionLocation[] locs = loc.getRegionLocations(); 137 if (locs.length == 1) { 138 LOG.warn("No replicas found for {}", action.getAction()); 139 return; 140 } 141 synchronized (replicaResultLock) { 142 // Don't run replica calls if the original has finished. We could do it e.g. if 143 // original has already failed before first replica call (unlikely given retries), 144 // but that would require additional synchronization w.r.t. returning to caller. 145 if (results[index] != null) return; 146 // We set the number of calls here. After that any path must call setResult/setError. 147 // True even for replicas that are not found - if we refuse to send we MUST set error. 148 updateResult(index, new ReplicaResultState(locs.length)); 149 } 150 for (int i = 1; i < locs.length; ++i) { 151 Action replicaAction = new Action(action, i); 152 if (locs[i] != null) { 153 asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), 154 replicaAction, actionsByServer, nonceGroup); 155 } else { 156 unknownReplicaActions.add(replicaAction); 157 } 158 } 159 } 160 161 private void addReplicaActionsAgain(Action action, 162 Map<ServerName, MultiAction> actionsByServer) { 163 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { 164 throw new AssertionError("Cannot have default replica here"); 165 } 166 HRegionLocation loc = getReplicaLocationOrFail(action); 167 if (loc == null) return; 168 asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), action, 169 actionsByServer, nonceGroup); 170 } 171 } 172 173 /** 174 * Runnable (that can be submitted to thread pool) that submits MultiAction to a single server. 175 * The server call is synchronous, therefore we do it on a thread pool. 176 */ 177 final class SingleServerRequestRunnable implements Runnable { 178 private final MultiAction multiAction; 179 private final int numAttempt; 180 private final ServerName server; 181 private final Set<CancellableRegionServerCallable> callsInProgress; 182 183 SingleServerRequestRunnable(MultiAction multiAction, int numAttempt, ServerName server, 184 Set<CancellableRegionServerCallable> callsInProgress) { 185 this.multiAction = multiAction; 186 this.numAttempt = numAttempt; 187 this.server = server; 188 this.callsInProgress = callsInProgress; 189 } 190 191 @Override 192 public void run() { 193 AbstractResponse res = null; 194 CancellableRegionServerCallable callable = currentCallable; 195 try { 196 // setup the callable based on the actions, if we don't have one already from the request 197 if (callable == null) { 198 callable = createCallable(server, tableName, multiAction); 199 } 200 RpcRetryingCaller<AbstractResponse> caller = 201 asyncProcess.createCaller(callable, rpcTimeout); 202 try { 203 if (callsInProgress != null) { 204 callsInProgress.add(callable); 205 } 206 res = caller.callWithoutRetries(callable, operationTimeout); 207 if (res == null) { 208 // Cancelled 209 return; 210 } 211 } catch (OperationTimeoutExceededException e) { 212 // The operation has timed out before executing the actual callable. This may be due to 213 // slow/hotspotted meta or the operation timeout set too low for the number of requests. 214 // Circumventing the usual failure flow ensure the meta cache is not cleared and will not 215 // result in a doomed feedback loop in which the meta continues to be hotspotted. 216 // See HBASE-27487 217 failAll(multiAction, server, numAttempt, e); 218 return; 219 } catch (IOException e) { 220 // The service itself failed . It may be an error coming from the communication 221 // layer, but, as well, a functional error raised by the server. 222 receiveGlobalFailure(multiAction, server, numAttempt, e, true); 223 return; 224 } catch (Throwable t) { 225 // This should not happen. Let's log & retry anyway. 226 LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." 227 + " Retrying. Server=" + server + ", tableName=" + tableName, t); 228 receiveGlobalFailure(multiAction, server, numAttempt, t, true); 229 return; 230 } 231 if (res.type() == AbstractResponse.ResponseType.MULTI) { 232 // Normal case: we received an answer from the server, and it's not an exception. 233 receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt); 234 } else { 235 if (results != null) { 236 SingleResponse singleResponse = (SingleResponse) res; 237 updateResult(0, singleResponse.getEntry()); 238 } 239 decActionCounter(1); 240 } 241 } catch (Throwable t) { 242 // Something really bad happened. We are on the send thread that will now die. 243 LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t); 244 throw new RuntimeException(t); 245 } finally { 246 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 247 if (callsInProgress != null && callable != null && res != null) { 248 callsInProgress.remove(callable); 249 } 250 } 251 } 252 } 253 254 private final Batch.Callback<CResult> callback; 255 private final BatchErrors errors; 256 private final ConnectionImplementation.ServerErrorTracker errorsByServer; 257 private final ExecutorService pool; 258 private final Set<CancellableRegionServerCallable> callsInProgress; 259 260 private final TableName tableName; 261 private final AtomicLong actionsInProgress = new AtomicLong(-1); 262 /** 263 * The lock controls access to results. It is only held when populating results where there might 264 * be several callers (eventual consistency gets). For other requests, there's one unique call 265 * going on per result index. 266 */ 267 private final Object replicaResultLock = new Object(); 268 /** 269 * Result array. Null if results are not needed. Otherwise, each index corresponds to the action 270 * index in initial actions submitted. For most request types, has null-s for requests that are 271 * not done, and result/exception for those that are done. For eventual-consistency gets, 272 * initially the same applies; at some point, replica calls might be started, and 273 * ReplicaResultState is put at the corresponding indices. The returning calls check the type to 274 * detect when this is the case. After all calls are done, ReplicaResultState-s are replaced with 275 * results for the user. 276 */ 277 private final Object[] results; 278 /** 279 * Indices of replica gets in results. If null, all or no actions are replica-gets. 280 */ 281 private final int[] replicaGetIndices; 282 private final boolean hasAnyReplicaGets; 283 private final long nonceGroup; 284 private final CancellableRegionServerCallable currentCallable; 285 private final int operationTimeout; 286 private final int rpcTimeout; 287 private final AsyncProcess asyncProcess; 288 private final Map<String, byte[]> requestAttributes; 289 290 /** 291 * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only 292 * used to make logging more clear, we don't actually care why we don't retry. 293 */ 294 public enum Retry { 295 YES, 296 NO_LOCATION_PROBLEM, 297 NO_NOT_RETRIABLE, 298 NO_RETRIES_EXHAUSTED, 299 NO_OTHER_SUCCEEDED 300 } 301 302 /** 303 * Sync point for calls to multiple replicas for the same user request (Get). Created and put in 304 * the results array (we assume replica calls require results) when the replica calls are 305 * launched. See results for details of this process. POJO, all fields are public. To modify them, 306 * the object itself is locked. 307 */ 308 private static class ReplicaResultState { 309 public ReplicaResultState(int callCount) { 310 this.callCount = callCount; 311 } 312 313 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ 314 int callCount; 315 /** 316 * Errors for which it is not decided whether we will report them to user. If one of the calls 317 * succeeds, we will discard the errors that may have happened in the other calls. 318 */ 319 BatchErrors replicaErrors = null; 320 321 @Override 322 public String toString() { 323 return "[call count " + callCount + "; errors " + replicaErrors + "]"; 324 } 325 } 326 327 public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup, 328 AsyncProcess asyncProcess) { 329 this.pool = task.getPool(); 330 this.callback = task.getCallback(); 331 this.nonceGroup = nonceGroup; 332 this.tableName = task.getTableName(); 333 this.actionsInProgress.set(actions.size()); 334 if (task.getResults() == null) { 335 results = task.getNeedResults() ? new Object[actions.size()] : null; 336 } else { 337 if (task.getResults().length != actions.size()) { 338 throw new AssertionError("results.length"); 339 } 340 this.results = task.getResults(); 341 for (int i = 0; i != this.results.length; ++i) { 342 results[i] = null; 343 } 344 } 345 List<Integer> replicaGetIndices = null; 346 boolean hasAnyReplicaGets = false; 347 if (results != null) { 348 // Check to see if any requests might require replica calls. 349 // We expect that many requests will consist of all or no multi-replica gets; in such 350 // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will 351 // store the list of action indexes for which replica gets are possible, and set 352 // hasAnyReplicaGets to true. 353 boolean hasAnyNonReplicaReqs = false; 354 int posInList = 0; 355 for (Action action : actions) { 356 boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction()); 357 if (isReplicaGet) { 358 hasAnyReplicaGets = true; 359 if (hasAnyNonReplicaReqs) { // Mixed case 360 if (replicaGetIndices == null) { 361 replicaGetIndices = new ArrayList<>(actions.size() - 1); 362 } 363 replicaGetIndices.add(posInList); 364 } 365 } else if (!hasAnyNonReplicaReqs) { 366 // The first non-multi-replica request in the action list. 367 hasAnyNonReplicaReqs = true; 368 if (posInList > 0) { 369 // Add all the previous requests to the index lists. We know they are all 370 // replica-gets because this is the first non-multi-replica request in the list. 371 replicaGetIndices = new ArrayList<>(actions.size() - 1); 372 for (int i = 0; i < posInList; ++i) { 373 replicaGetIndices.add(i); 374 } 375 } 376 } 377 ++posInList; 378 } 379 } 380 this.hasAnyReplicaGets = hasAnyReplicaGets; 381 if (replicaGetIndices != null) { 382 this.replicaGetIndices = new int[replicaGetIndices.size()]; 383 int i = 0; 384 for (Integer el : replicaGetIndices) { 385 this.replicaGetIndices[i++] = el; 386 } 387 } else { 388 this.replicaGetIndices = null; 389 } 390 this.callsInProgress = !hasAnyReplicaGets 391 ? null 392 : Collections 393 .newSetFromMap(new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>()); 394 this.asyncProcess = asyncProcess; 395 this.errorsByServer = createServerErrorTracker(); 396 this.errors = new BatchErrors(); 397 this.operationTimeout = task.getOperationTimeout(); 398 this.rpcTimeout = task.getRpcTimeout(); 399 this.currentCallable = task.getCallable(); 400 if (task.getCallable() == null) { 401 tracker = new RetryingTimeTracker().start(); 402 } 403 this.requestAttributes = task.getRequestAttributes(); 404 } 405 406 protected Set<CancellableRegionServerCallable> getCallsInProgress() { 407 return callsInProgress; 408 } 409 410 SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, 411 ServerName server, Set<CancellableRegionServerCallable> callsInProgress) { 412 return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); 413 } 414 415 /** 416 * Some checked calls send a callable with their own tracker. This method checks the operation 417 * timeout against the appropriate tracker, or returns false if no tracker. 418 */ 419 private boolean isOperationTimeoutExceeded() { 420 // return value of 1 is special to mean exceeded, to differentiate from 0 421 // which is no timeout. see implementation of RetryingTimeTracker.getRemainingTime 422 return getRemainingTime() == 1; 423 } 424 425 private long getRemainingTime() { 426 RetryingTimeTracker currentTracker; 427 if (tracker != null) { 428 currentTracker = tracker; 429 } else if (currentCallable != null && currentCallable.getTracker() != null) { 430 currentTracker = currentCallable.getTracker(); 431 } else { 432 return 0; 433 } 434 435 // no-op if already started, this is just to ensure it was initialized (usually true) 436 currentTracker.start(); 437 438 return currentTracker.getRemainingTime(operationTimeout); 439 } 440 441 /** 442 * Group a list of actions per region servers, and send them. 443 * @param currentActions - the list of row to submit 444 * @param numAttempt - the current numAttempt (first attempt is 1) 445 */ 446 void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) { 447 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 448 449 boolean isReplica = false; 450 List<Action> unknownReplicaActions = null; 451 for (Action action : currentActions) { 452 if (isOperationTimeoutExceeded()) { 453 String message = numAttempt == 1 454 ? "Operation timeout exceeded during resolution of region locations, " 455 + "prior to executing any actions." 456 : "Operation timeout exceeded during re-resolution of region locations on retry " 457 + (numAttempt - 1) + "."; 458 459 message += " Meta may be slow or operation timeout too short for batch size or retries."; 460 OperationTimeoutExceededException exception = 461 new OperationTimeoutExceededException(message); 462 463 // Clear any actions we already resolved, because none will have been executed yet 464 // We are going to fail all passed actions because there's no way we can execute any 465 // if operation timeout is exceeded. 466 actionsByServer.clear(); 467 for (Action actionToFail : currentActions) { 468 manageLocationError(actionToFail, exception); 469 } 470 return; 471 } 472 473 RegionLocations locs = findAllLocationsOrFail(action, true); 474 if (locs == null) continue; 475 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 476 if (isReplica && !isReplicaAction) { 477 // This is the property of the current implementation, not a requirement. 478 throw new AssertionError("Replica and non-replica actions in the same retry"); 479 } 480 isReplica = isReplicaAction; 481 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); 482 if (loc == null || loc.getServerName() == null) { 483 if (isReplica) { 484 if (unknownReplicaActions == null) { 485 unknownReplicaActions = new ArrayList<>(1); 486 } 487 unknownReplicaActions.add(action); 488 } else { 489 // TODO: relies on primary location always being fetched 490 manageLocationError(action, null); 491 } 492 } else { 493 byte[] regionName = loc.getRegionInfo().getRegionName(); 494 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, 495 nonceGroup); 496 } 497 } 498 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); 499 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty(); 500 501 if (!actionsByServer.isEmpty()) { 502 // If this is a first attempt to group and send, no replicas, we need replica thread. 503 sendMultiAction(actionsByServer, numAttempt, 504 (doStartReplica && !hasUnknown) ? currentActions : null, numAttempt > 1 && !hasUnknown); 505 } 506 507 if (hasUnknown) { 508 actionsByServer = new HashMap<>(); 509 for (Action action : unknownReplicaActions) { 510 HRegionLocation loc = getReplicaLocationOrFail(action); 511 if (loc == null) continue; 512 byte[] regionName = loc.getRegionInfo().getRegionName(); 513 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, 514 nonceGroup); 515 } 516 if (!actionsByServer.isEmpty()) { 517 sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null, true); 518 } 519 } 520 } 521 522 private HRegionLocation getReplicaLocationOrFail(Action action) { 523 // We are going to try get location once again. For each action, we'll do it once 524 // from cache, because the previous calls in the loop might populate it. 525 int replicaId = action.getReplicaId(); 526 RegionLocations locs = findAllLocationsOrFail(action, true); 527 if (locs == null) return null; // manageError already called 528 HRegionLocation loc = locs.getRegionLocation(replicaId); 529 if (loc == null || loc.getServerName() == null) { 530 locs = findAllLocationsOrFail(action, false); 531 if (locs == null) return null; // manageError already called 532 loc = locs.getRegionLocation(replicaId); 533 } 534 if (loc == null || loc.getServerName() == null) { 535 manageLocationError(action, null); 536 return null; 537 } 538 return loc; 539 } 540 541 private void manageLocationError(Action action, Exception ex) { 542 String msg = 543 "Cannot get replica " + action.getReplicaId() + " location for " + action.getAction(); 544 LOG.error(msg); 545 if (ex == null) { 546 ex = new IOException(msg); 547 } 548 manageError(action.getOriginalIndex(), action.getAction(), Retry.NO_LOCATION_PROBLEM, ex, null); 549 } 550 551 private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { 552 if (action.getAction() == null) 553 throw new IllegalArgumentException("#" + asyncProcess.id + ", row cannot be null"); 554 RegionLocations loc = null; 555 try { 556 loc = asyncProcess.connection.locateRegion(tableName, action.getAction().getRow(), useCache, 557 true, action.getReplicaId()); 558 } catch (IOException ex) { 559 manageLocationError(action, ex); 560 } 561 return loc; 562 } 563 564 /** 565 * Send a multi action structure to the servers, after a delay depending on the attempt number. 566 * Asynchronous. 567 * @param actionsByServer the actions structured by regions 568 * @param numAttempt the attempt number. 569 * @param actionsForReplicaThread original actions for replica thread; null on non-first call. 570 */ 571 void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt, 572 List<Action> actionsForReplicaThread, boolean reuseThread) { 573 boolean clearServerCache = true; 574 // Run the last item on the same thread if we are already on a send thread. 575 // We hope most of the time it will be the only item, so we can cut down on threads. 576 int actionsRemaining = actionsByServer.size(); 577 // This iteration is by server (the HRegionLocation comparator is by server portion only). 578 for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) { 579 ServerName server = e.getKey(); 580 MultiAction multiAction = e.getValue(); 581 Collection<? extends Runnable> runnables = 582 getNewMultiActionRunnable(server, multiAction, numAttempt); 583 // make sure we correctly count the number of runnables before we try to reuse the send 584 // thread, in case we had to split the request into different runnables because of backoff 585 if (runnables.size() > actionsRemaining) { 586 actionsRemaining = runnables.size(); 587 } 588 589 // run all the runnables 590 // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack 591 // overflow 592 // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth 593 for (Runnable runnable : runnables) { 594 if ( 595 (--actionsRemaining == 0) && reuseThread 596 && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0 597 ) { 598 runnable.run(); 599 } else { 600 try { 601 pool.execute(runnable); 602 } catch (Throwable t) { 603 if (t instanceof RejectedExecutionException) { 604 // This should never happen. But as the pool is provided by the end user, 605 // let's secure this a little. 606 LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + " Server=" 607 + server.getServerName(), t); 608 // Do not update cache if exception is from failing to submit action to thread pool 609 clearServerCache = false; 610 } else { 611 // see #HBASE-14359 for more details 612 LOG.warn("Caught unexpected exception/error: ", t); 613 } 614 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 615 // We're likely to fail again, but this will increment the attempt counter, 616 // so it will finish. 617 receiveGlobalFailure(multiAction, server, numAttempt, t, clearServerCache); 618 } 619 } 620 } 621 } 622 623 if (actionsForReplicaThread != null) { 624 startWaitingForReplicaCalls(actionsForReplicaThread); 625 } 626 } 627 628 @SuppressWarnings("MixedMutabilityReturnType") 629 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server, 630 MultiAction multiAction, int numAttempt) { 631 // no stats to manage, just do the standard action 632 if (asyncProcess.connection.getStatisticsTracker() == null) { 633 if (asyncProcess.connection.getConnectionMetrics() != null) { 634 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 635 } 636 asyncProcess.incTaskCounters(multiAction.getRegions(), server); 637 SingleServerRequestRunnable runnable = 638 createSingleServerRequest(multiAction, numAttempt, server, callsInProgress); 639 640 // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable 641 return Collections.singletonList(runnable); 642 } 643 644 // group the actions by the amount of delay 645 Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size()); 646 647 // split up the actions 648 for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) { 649 Long backoff = getBackoff(server, e.getKey()); 650 DelayingRunner runner = actions.get(backoff); 651 if (runner == null) { 652 actions.put(backoff, new DelayingRunner(backoff, e)); 653 } else { 654 runner.add(e); 655 } 656 } 657 658 List<Runnable> toReturn = new ArrayList<>(actions.size()); 659 for (DelayingRunner runner : actions.values()) { 660 asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); 661 Runnable runnable = 662 createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); 663 // use a delay runner only if we need to sleep for some time 664 if (runner.getSleepTime() > 0) { 665 runner.setRunner(runnable); 666 runnable = runner; 667 if (asyncProcess.connection.getConnectionMetrics() != null) { 668 asyncProcess.connection.getConnectionMetrics() 669 .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime()); 670 } 671 } else { 672 if (asyncProcess.connection.getConnectionMetrics() != null) { 673 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 674 } 675 } 676 // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable 677 toReturn.add(runnable); 678 679 } 680 return toReturn; 681 } 682 683 /** 684 * @param server server location where the target region is hosted 685 * @param regionName name of the region which we are going to write some data 686 * @return the amount of time the client should wait until it submit a request to the specified 687 * server and region 688 */ 689 private Long getBackoff(ServerName server, byte[] regionName) { 690 ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker(); 691 ServerStatistics stats = tracker.getStats(server); 692 return asyncProcess.connection.getBackoffPolicy().getBackoffTime(server, regionName, stats); 693 } 694 695 /** 696 * Starts waiting to issue replica calls on a different thread; or issues them immediately. 697 */ 698 private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) { 699 long startTime = EnvironmentEdgeManager.currentTime(); 700 ReplicaCallIssuingRunnable replicaRunnable = 701 new ReplicaCallIssuingRunnable(actionsForReplicaThread, startTime); 702 if (asyncProcess.primaryCallTimeoutMicroseconds == 0) { 703 // Start replica calls immediately. 704 replicaRunnable.run(); 705 } else { 706 // Start the thread that may kick off replica gets. 707 // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea. 708 try { 709 pool.execute(replicaRunnable); 710 } catch (RejectedExecutionException ree) { 711 LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree); 712 } 713 } 714 } 715 716 /** 717 * Check that we can retry acts accordingly: logs, set the error status. 718 * @param originalIndex the position in the list sent 719 * @param row the row 720 * @param canRetry if false, we won't retry whatever the settings. 721 * @param throwable the throwable, if any (can be null) 722 * @param server the location, if any (can be null) 723 * @return true if the action can be retried, false otherwise. 724 */ 725 Retry manageError(int originalIndex, Row row, Retry canRetry, Throwable throwable, 726 ServerName server) { 727 if (canRetry == Retry.YES && throwable != null && throwable instanceof DoNotRetryIOException) { 728 canRetry = Retry.NO_NOT_RETRIABLE; 729 } 730 731 if (canRetry != Retry.YES) { 732 // Batch.Callback<Res> was not called on failure in 0.94. We keep this. 733 setError(originalIndex, row, throwable, server); 734 } else if (isActionComplete(originalIndex, row)) { 735 canRetry = Retry.NO_OTHER_SUCCEEDED; 736 } 737 return canRetry; 738 } 739 740 /** 741 * Fail all the actions from this multiaction after an OperationTimeoutExceededException 742 * @param actions the actions still to do from the initial list 743 * @param server the destination 744 * @param numAttempt the number of attempts so far 745 * @param throwable the throwable that caused the failure 746 */ 747 private void failAll(MultiAction actions, ServerName server, int numAttempt, 748 Throwable throwable) { 749 int failed = 0; 750 for (Map.Entry<byte[], List<Action>> e : actions.actions.entrySet()) { 751 for (Action action : e.getValue()) { 752 setError(action.getOriginalIndex(), action.getAction(), throwable, server); 753 ++failed; 754 } 755 } 756 logNoResubmit(server, numAttempt, actions.size(), throwable, failed, 0); 757 } 758 759 /** 760 * Resubmit all the actions from this multiaction after a failure. 761 * @param rsActions the actions still to do from the initial list 762 * @param server the destination 763 * @param numAttempt the number of attempts so far 764 * @param t the throwable (if any) that caused the resubmit 765 */ 766 private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int numAttempt, 767 Throwable t, boolean clearServerCache) { 768 errorsByServer.reportServerError(server); 769 Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 770 771 // Do not update cache if exception is from failing to submit action to thread pool 772 if (clearServerCache) { 773 cleanServerCache(server, t); 774 } 775 776 int failed = 0; 777 int stopped = 0; 778 List<Action> toReplay = new ArrayList<>(); 779 for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) { 780 byte[] regionName = e.getKey(); 781 byte[] row = e.getValue().get(0).getAction().getRow(); 782 // Do not use the exception for updating cache because it might be coming from 783 // any of the regions in the MultiAction and do not update cache if exception is 784 // from failing to submit action to thread pool 785 if (clearServerCache) { 786 updateCachedLocations(server, regionName, row, 787 ClientExceptionsUtil.isMetaClearingException(t) ? null : t); 788 } 789 for (Action action : e.getValue()) { 790 Retry retry = 791 manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server); 792 if (retry == Retry.YES) { 793 toReplay.add(action); 794 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { 795 ++stopped; 796 } else { 797 ++failed; 798 } 799 } 800 } 801 802 if (toReplay.isEmpty()) { 803 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); 804 } else { 805 resubmit(server, toReplay, numAttempt, rsActions.size(), t); 806 } 807 } 808 809 /** 810 * Log as much info as possible, and, if there is something to replay, submit it again after a 811 * back off sleep. 812 */ 813 private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttempt, 814 int failureCount, Throwable throwable) { 815 // We have something to replay. We're going to sleep a little before. 816 817 // We have two contradicting needs here: 818 // 1) We want to get the new location after having slept, as it may change. 819 // 2) We want to take into account the location when calculating the sleep time. 820 // 3) If all this is just because the response needed to be chunked try again FAST. 821 // It should be possible to have some heuristics to take the right decision. Short term, 822 // we go for one. 823 boolean retryImmediately = throwable instanceof RetryImmediatelyException; 824 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; 825 long backOffTime; 826 if (retryImmediately) { 827 backOffTime = 0; 828 } else if (throwable instanceof RpcThrottlingException) { 829 backOffTime = ((RpcThrottlingException) throwable).getWaitInterval(); 830 } else if (HBaseServerException.isServerOverloaded(throwable)) { 831 // Give a special check when encountering an exception indicating the server is overloaded. 832 // see #HBASE-17114 and HBASE-26807 833 backOffTime = errorsByServer.calculateBackoffTime(oldServer, 834 asyncProcess.connectionConfiguration.getPauseMillisForServerOverloaded()); 835 } else { 836 backOffTime = errorsByServer.calculateBackoffTime(oldServer, 837 asyncProcess.connectionConfiguration.getPauseMillis()); 838 } 839 840 MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics(); 841 if (metrics != null && HBaseServerException.isServerOverloaded(throwable)) { 842 metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS); 843 } 844 845 if (numAttempt > asyncProcess.startLogErrorsCnt) { 846 // We use this value to have some logs when we have multiple failures, but not too many 847 // logs, as errors are to be expected when a region moves, splits and so on 848 LOG.info(createLog(numAttempt, failureCount, toReplay.size(), oldServer, throwable, 849 backOffTime, true, null, -1, -1)); 850 } 851 852 long remainingTime = getRemainingTime(); 853 // 1 is a special value meaning exceeded and 0 means no timeout. 854 // throw if timeout already exceeded, or if backoff is larger than non-zero remaining 855 if (remainingTime == 1 || (remainingTime > 0 && backOffTime > remainingTime)) { 856 OperationTimeoutExceededException ex = new OperationTimeoutExceededException( 857 "Backoff time of " + backOffTime + "ms would exceed operation timeout", throwable); 858 for (Action actionToFail : toReplay) { 859 manageError(actionToFail.getOriginalIndex(), actionToFail.getAction(), 860 Retry.NO_NOT_RETRIABLE, ex, null); 861 } 862 return; 863 } 864 865 try { 866 if (backOffTime > 0) { 867 Thread.sleep(backOffTime); 868 } 869 } catch (InterruptedException e) { 870 LOG.warn( 871 "#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); 872 Thread.currentThread().interrupt(); 873 return; 874 } 875 876 groupAndSendMultiAction(toReplay, nextAttemptNumber); 877 } 878 879 private void logNoResubmit(ServerName oldServer, int numAttempt, int failureCount, 880 Throwable throwable, int failed, int stopped) { 881 if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) { 882 @SuppressWarnings("JavaUtilDate") 883 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); 884 String logMessage = createLog(numAttempt, failureCount, 0, oldServer, throwable, -1, false, 885 timeStr, failed, stopped); 886 if (failed != 0) { 887 // Only log final failures as warning 888 LOG.warn(logMessage); 889 } else { 890 LOG.info(logMessage); 891 } 892 } 893 } 894 895 /** 896 * Called when we receive the result of a server query. 897 * @param multiAction - the multiAction we sent 898 * @param server - the location. It's used as a server name. 899 * @param responses - the response, if any 900 * @param numAttempt - the attempt 901 */ 902 private void receiveMultiAction(MultiAction multiAction, ServerName server, 903 MultiResponse responses, int numAttempt) { 904 assert responses != null; 905 updateStats(server, responses); 906 // Success or partial success 907 // Analyze detailed results. We can still have individual failures to be redo. 908 // two specific throwables are managed: 909 // - DoNotRetryIOException: we continue to retry for other actions 910 // - RegionMovedException: we update the cache with the new region location 911 Map<byte[], MultiResponse.RegionResult> results = responses.getResults(); 912 List<Action> toReplay = new ArrayList<>(); 913 Throwable lastException = null; 914 int failureCount = 0; 915 int failed = 0; 916 int stopped = 0; 917 Retry retry = null; 918 // Go by original action. 919 for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) { 920 byte[] regionName = regionEntry.getKey(); 921 922 Throwable regionException = responses.getExceptions().get(regionName); 923 if (regionException != null) { 924 cleanServerCache(server, regionException); 925 } 926 927 Map<Integer, Object> regionResults = 928 results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap(); 929 boolean regionFailureRegistered = false; 930 for (Action sentAction : regionEntry.getValue()) { 931 Object result = regionResults.get(sentAction.getOriginalIndex()); 932 if (result == null) { 933 if (regionException == null) { 934 LOG.error("Server sent us neither results nor exceptions for " 935 + Bytes.toStringBinary(regionName) + ", numAttempt:" + numAttempt); 936 regionException = new RuntimeException("Invalid response"); 937 } 938 // If the row operation encounters the region-lever error, the exception of action may be 939 // null. 940 result = regionException; 941 } 942 // Failure: retry if it's make sense else update the errors lists 943 if (result instanceof Throwable) { 944 Throwable actionException = (Throwable) result; 945 Row row = sentAction.getAction(); 946 lastException = regionException != null 947 ? regionException 948 : ClientExceptionsUtil.findException(actionException); 949 // Register corresponding failures once per server/once per region. 950 if (!regionFailureRegistered) { 951 regionFailureRegistered = true; 952 updateCachedLocations(server, regionName, row.getRow(), actionException); 953 } 954 if (retry == null) { 955 errorsByServer.reportServerError(server); 956 // We determine canRetry only once for all calls, after reporting server failure. 957 retry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 958 } 959 ++failureCount; 960 switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException, server)) { 961 case YES: 962 toReplay.add(sentAction); 963 break; 964 case NO_OTHER_SUCCEEDED: 965 ++stopped; 966 break; 967 default: 968 ++failed; 969 break; 970 } 971 } else { 972 invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result); 973 setResult(sentAction, result); 974 } 975 } 976 } 977 if (toReplay.isEmpty()) { 978 logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped); 979 } else { 980 resubmit(server, toReplay, numAttempt, failureCount, lastException); 981 } 982 } 983 984 private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row, 985 Throwable rowException) { 986 if (tableName == null) { 987 return; 988 } 989 try { 990 asyncProcess.connection.updateCachedLocations(tableName, regionName, row, rowException, 991 server); 992 } catch (Throwable ex) { 993 // That should never happen, but if it did, we want to make sure 994 // we still process errors 995 LOG.error("Couldn't update cached region locations: " + ex); 996 } 997 } 998 999 private void invokeCallBack(byte[] regionName, byte[] row, CResult result) { 1000 if (callback != null) { 1001 try { 1002 // noinspection unchecked 1003 // TODO: would callback expect a replica region name if it gets one? 1004 this.callback.update(regionName, row, result); 1005 } catch (Throwable t) { 1006 LOG.error( 1007 "User callback threw an exception for " + Bytes.toStringBinary(regionName) + ", ignoring", 1008 t); 1009 } 1010 } 1011 } 1012 1013 private void cleanServerCache(ServerName server, Throwable regionException) { 1014 if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) { 1015 // We want to make sure to clear the cache in case there were location-related exceptions. 1016 // We don't to clear the cache for every possible exception that comes through, however. 1017 MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics(); 1018 if (metrics != null) { 1019 metrics.incrCacheDroppingExceptions(regionException); 1020 } 1021 asyncProcess.connection.clearCaches(server); 1022 } 1023 } 1024 1025 protected void updateStats(ServerName server, MultiResponse resp) { 1026 ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()), 1027 Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp); 1028 } 1029 1030 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, 1031 Throwable error, long backOffTime, boolean willRetry, String startTime, int failed, 1032 int stopped) { 1033 StringBuilder sb = new StringBuilder(); 1034 sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName) 1035 .append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries) 1036 .append(", "); 1037 1038 if (failureCount > 0 || error != null) { 1039 sb.append("failureCount=").append(failureCount).append("ops").append(", last exception=") 1040 .append(error); 1041 } else { 1042 sb.append("succeeded"); 1043 } 1044 1045 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); 1046 1047 if (willRetry) { 1048 sb.append(", retrying after=").append(backOffTime).append("ms") 1049 .append(", operationsToReplay=").append(replaySize); 1050 } else if (failureCount > 0) { 1051 if (stopped > 0) { 1052 sb.append("; NOT retrying, stopped=").append(stopped) 1053 .append(" because successful operation on other replica"); 1054 } 1055 if (failed > 0) { 1056 sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!"); 1057 } 1058 } 1059 1060 return sb.toString(); 1061 } 1062 1063 /** 1064 * Sets the non-error result from a particular action. 1065 * @param action Action (request) that the server responded to. 1066 * @param result The result. 1067 */ 1068 private void setResult(Action action, Object result) { 1069 if (result == null) { 1070 throw new RuntimeException("Result cannot be null"); 1071 } 1072 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 1073 int index = action.getOriginalIndex(); 1074 if (results == null) { 1075 decActionCounter(index); 1076 return; // Simple case, no replica requests. 1077 } 1078 ReplicaResultState state = 1079 trySetResultSimple(index, action.getAction(), false, result, null, isStale); 1080 if (state == null) { 1081 return; // Simple case, no replica requests. 1082 } 1083 // At this point we know that state is set to replica tracking class. 1084 // It could be that someone else is also looking at it; however, we know there can 1085 // only be one state object, and only one thread can set callCount to 0. Other threads 1086 // will either see state with callCount 0 after locking it; or will not see state at all 1087 // we will replace it with the result. 1088 synchronized (state) { 1089 if (state.callCount == 0) { 1090 return; // someone already set the result 1091 } 1092 state.callCount = 0; 1093 } 1094 synchronized (replicaResultLock) { 1095 if (results[index] != state) { 1096 throw new AssertionError("We set the callCount but someone else replaced the result"); 1097 } 1098 updateResult(index, result); 1099 } 1100 1101 decActionCounter(index); 1102 } 1103 1104 /** 1105 * Sets the error from a particular action. 1106 * @param index Original action index. 1107 * @param row Original request. 1108 * @param throwable The resulting error. 1109 * @param server The source server. 1110 */ 1111 private void setError(int index, Row row, Throwable throwable, ServerName server) { 1112 if (results == null) { 1113 // Note that we currently cannot have replica requests with null results. So it shouldn't 1114 // happen that multiple replica calls will call dAC for same actions with results == null. 1115 // Only one call per action should be present in this case. 1116 errors.add(throwable, row, server); 1117 decActionCounter(index); 1118 return; // Simple case, no replica requests. 1119 } 1120 ReplicaResultState state = trySetResultSimple(index, row, true, throwable, server, false); 1121 if (state == null) { 1122 return; // Simple case, no replica requests. 1123 } 1124 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. 1125 boolean isActionDone = false; 1126 synchronized (state) { 1127 switch (state.callCount) { 1128 case 0: 1129 return; // someone already set the result 1130 case 1: { // All calls failed, we are the last error. 1131 target = errors; 1132 isActionDone = true; 1133 break; 1134 } 1135 default: { 1136 assert state.callCount > 1; 1137 if (state.replicaErrors == null) { 1138 state.replicaErrors = new BatchErrors(); 1139 } 1140 target = state.replicaErrors; 1141 break; 1142 } 1143 } 1144 --state.callCount; 1145 } 1146 target.add(throwable, row, server); 1147 if (isActionDone) { 1148 if (state.replicaErrors != null) { // last call, no need to lock 1149 errors.merge(state.replicaErrors); 1150 } 1151 // See setResult for explanations. 1152 synchronized (replicaResultLock) { 1153 if (results[index] != state) { 1154 throw new AssertionError("We set the callCount but someone else replaced the result"); 1155 } 1156 updateResult(index, throwable); 1157 } 1158 decActionCounter(index); 1159 } 1160 } 1161 1162 /** 1163 * Checks if the action is complete; used on error to prevent needless retries. Does not 1164 * synchronize, assuming element index/field accesses are atomic. This is an opportunistic 1165 * optimization check, doesn't have to be strict. 1166 * @param index Original action index. 1167 * @param row Original request. 1168 */ 1169 private boolean isActionComplete(int index, Row row) { 1170 if (!AsyncProcess.isReplicaGet(row)) return false; 1171 Object resObj = results[index]; 1172 return (resObj != null) 1173 && (!(resObj instanceof ReplicaResultState) || ((ReplicaResultState) resObj).callCount == 0); 1174 } 1175 1176 /** 1177 * Tries to set the result or error for a particular action as if there were no replica calls. 1178 * @return null if successful; replica state if there were in fact replica calls. 1179 */ 1180 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, Object result, 1181 ServerName server, boolean isFromReplica) { 1182 Object resObj = null; 1183 if (!AsyncProcess.isReplicaGet(row)) { 1184 if (isFromReplica) { 1185 throw new AssertionError("Unexpected stale result for " + row); 1186 } 1187 updateResult(index, result); 1188 } else { 1189 synchronized (replicaResultLock) { 1190 resObj = results[index]; 1191 if (resObj == null) { 1192 if (isFromReplica) { 1193 throw new AssertionError("Unexpected stale result for " + row); 1194 } 1195 updateResult(index, result); 1196 } 1197 } 1198 } 1199 1200 ReplicaResultState rrs = 1201 (resObj instanceof ReplicaResultState) ? (ReplicaResultState) resObj : null; 1202 if (rrs == null && isError) { 1203 // The resObj is not replica state (null or already set). 1204 errors.add((Throwable) result, row, server); 1205 } 1206 1207 if (resObj == null) { 1208 // resObj is null - no replica calls were made. 1209 decActionCounter(index); 1210 return null; 1211 } 1212 return rrs; 1213 } 1214 1215 private void decActionCounter(int index) { 1216 long actionsRemaining = actionsInProgress.decrementAndGet(); 1217 if (actionsRemaining < 0) { 1218 String error = buildDetailedErrorMsg("Incorrect actions in progress", index); 1219 throw new AssertionError(error); 1220 } else if (actionsRemaining == 0) { 1221 synchronized (actionsInProgress) { 1222 actionsInProgress.notifyAll(); 1223 } 1224 } 1225 } 1226 1227 private String buildDetailedErrorMsg(String string, int index) { 1228 StringBuilder error = new StringBuilder(128); 1229 error.append(string).append("; called for ").append(index).append(", actionsInProgress ") 1230 .append(actionsInProgress.get()).append("; replica gets: "); 1231 if (replicaGetIndices != null) { 1232 for (int i = 0; i < replicaGetIndices.length; ++i) { 1233 error.append(replicaGetIndices[i]).append(", "); 1234 } 1235 } else { 1236 error.append(hasAnyReplicaGets ? "all" : "none"); 1237 } 1238 error.append("; results "); 1239 if (results != null) { 1240 for (int i = 0; i < results.length; ++i) { 1241 Object o = results[i]; 1242 error.append(((o == null) ? "null" : o.toString())).append(", "); 1243 } 1244 } 1245 return error.toString(); 1246 } 1247 1248 @Override 1249 public void waitUntilDone() throws InterruptedIOException { 1250 try { 1251 if (this.operationTimeout > 0) { 1252 // the worker thread maybe over by some exception without decrement the actionsInProgress, 1253 // then the guarantee of operationTimeout will be broken, so we should set cutoff to avoid 1254 // stuck here forever 1255 long cutoff = (EnvironmentEdgeManager.currentTime() + this.operationTimeout) * 1000L; 1256 if (!waitUntilDone(cutoff)) { 1257 throw new SocketTimeoutException("time out before the actionsInProgress changed to zero"); 1258 } 1259 } else { 1260 waitUntilDone(Long.MAX_VALUE); 1261 } 1262 } catch (InterruptedException iex) { 1263 throw new InterruptedIOException(iex.getMessage()); 1264 } finally { 1265 if (callsInProgress != null) { 1266 for (CancellableRegionServerCallable clb : callsInProgress) { 1267 clb.cancel(); 1268 } 1269 } 1270 } 1271 } 1272 1273 private boolean waitUntilDone(long cutoff) throws InterruptedException { 1274 boolean hasWait = cutoff != Long.MAX_VALUE; 1275 long lastLog = EnvironmentEdgeManager.currentTime(); 1276 long currentInProgress; 1277 while (0 != (currentInProgress = actionsInProgress.get())) { 1278 long now = EnvironmentEdgeManager.currentTime(); 1279 if (hasWait && (now * 1000L) > cutoff) { 1280 return false; 1281 } 1282 if (!hasWait) { // Only log if wait is infinite. 1283 if (now > lastLog + 10000) { 1284 lastLog = now; 1285 LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress 1286 + " actions to finish on table: " + tableName); 1287 } 1288 } 1289 synchronized (actionsInProgress) { 1290 if (actionsInProgress.get() == 0) break; 1291 if (!hasWait) { 1292 actionsInProgress.wait(10); 1293 } else { 1294 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); 1295 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); 1296 } 1297 } 1298 } 1299 return true; 1300 } 1301 1302 @Override 1303 public boolean hasError() { 1304 return errors.hasErrors(); 1305 } 1306 1307 @Override 1308 public List<? extends Row> getFailedOperations() { 1309 return errors.actions; 1310 } 1311 1312 @Override 1313 public RetriesExhaustedWithDetailsException getErrors() { 1314 return errors.makeException(asyncProcess.logBatchErrorDetails); 1315 } 1316 1317 @Override 1318 public Object[] getResults() throws InterruptedIOException { 1319 waitUntilDone(); 1320 return results; 1321 } 1322 1323 /** 1324 * Creates the server error tracker to use inside process. Currently, to preserve the main 1325 * assumption about current retries, and to work well with the retry-limit-based calculation, the 1326 * calculation is local per Process object. We may benefit from connection-wide tracking of server 1327 * errors. 1328 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection 1329 */ 1330 private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { 1331 return new ConnectionImplementation.ServerErrorTracker(asyncProcess.serverTrackerTimeout, 1332 asyncProcess.numTries); 1333 } 1334 1335 /** 1336 * Create a callable. Isolated to be easily overridden in the tests. 1337 */ 1338 private MultiServerCallable createCallable(final ServerName server, TableName tableName, 1339 final MultiAction multi) { 1340 return new MultiServerCallable(asyncProcess.connection, tableName, server, multi, 1341 asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority(), 1342 requestAttributes); 1343 } 1344 1345 private void updateResult(int index, Object result) { 1346 Object current = results[index]; 1347 if (current != null) { 1348 if (LOG.isDebugEnabled()) { 1349 LOG.debug("The result is assigned repeatedly! current:" + current + ", new:" + result); 1350 } 1351 } 1352 results[index] = result; 1353 } 1354 1355 long getNumberOfActionsInProgress() { 1356 return actionsInProgress.get(); 1357 } 1358}