001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.SocketTimeoutException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Date;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.RejectedExecutionException;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HBaseServerException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.RegionLocations;
042import org.apache.hadoop.hbase.RetryImmediatelyException;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
046import org.apache.hadoop.hbase.client.coprocessor.Batch;
047import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
048import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * The context, and return value, for a single submit/submitAll call. Note on how this class (one AP
057 * submit) works. Initially, all requests are split into groups by server; request is sent to each
058 * server in parallel; the RPC calls are not async so a thread per server is used. Every time some
059 * actions fail, regions/locations might have changed, so we re-group them by server and region
060 * again and send these groups in parallel too. The result, in case of retries, is a "tree" of
061 * threads, with parent exiting after scheduling children. This is why lots of code doesn't require
062 * any synchronization.
063 */
064@InterfaceAudience.Private
065class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
066
067  private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class);
068
069  private RetryingTimeTracker tracker;
070
071  /**
072   * Runnable (that can be submitted to thread pool) that waits for when it's time to issue replica
073   * calls, finds region replicas, groups the requests by replica and issues the calls (on separate
074   * threads, via sendMultiAction). This is done on a separate thread because we don't want to wait
075   * on user thread for our asynchronous call, and usually we have to wait before making replica
076   * calls.
077   */
078  private final class ReplicaCallIssuingRunnable implements Runnable {
079    private final long startTime;
080    private final List<Action> initialActions;
081
082    public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) {
083      this.initialActions = initialActions;
084      this.startTime = startTime;
085    }
086
087    @Override
088    public void run() {
089      boolean done = false;
090      if (asyncProcess.primaryCallTimeoutMicroseconds > 0) {
091        try {
092          done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds);
093        } catch (InterruptedException ex) {
094          LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage());
095          return;
096        }
097      }
098      if (done) return; // Done within primary timeout
099      Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
100      List<Action> unknownLocActions = new ArrayList<>();
101      if (replicaGetIndices == null) {
102        for (int i = 0; i < results.length; ++i) {
103          addReplicaActions(i, actionsByServer, unknownLocActions);
104        }
105      } else {
106        for (int replicaGetIndice : replicaGetIndices) {
107          addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
108        }
109      }
110      if (!actionsByServer.isEmpty()) {
111        sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
112      }
113      if (!unknownLocActions.isEmpty()) {
114        actionsByServer = new HashMap<>();
115        for (Action action : unknownLocActions) {
116          addReplicaActionsAgain(action, actionsByServer);
117        }
118        // Some actions may have completely failed, they are handled inside addAgain.
119        if (!actionsByServer.isEmpty()) {
120          sendMultiAction(actionsByServer, 1, null, true);
121        }
122      }
123    }
124
125    /**
126     * Add replica actions to action map by server.
127     * @param index           Index of the original action.
128     * @param actionsByServer The map by server to add it to.
129     */
130    private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer,
131      List<Action> unknownReplicaActions) {
132      if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
133      Action action = initialActions.get(index);
134      RegionLocations loc = findAllLocationsOrFail(action, true);
135      if (loc == null) return;
136      HRegionLocation[] locs = loc.getRegionLocations();
137      if (locs.length == 1) {
138        LOG.warn("No replicas found for {}", action.getAction());
139        return;
140      }
141      synchronized (replicaResultLock) {
142        // Don't run replica calls if the original has finished. We could do it e.g. if
143        // original has already failed before first replica call (unlikely given retries),
144        // but that would require additional synchronization w.r.t. returning to caller.
145        if (results[index] != null) return;
146        // We set the number of calls here. After that any path must call setResult/setError.
147        // True even for replicas that are not found - if we refuse to send we MUST set error.
148        updateResult(index, new ReplicaResultState(locs.length));
149      }
150      for (int i = 1; i < locs.length; ++i) {
151        Action replicaAction = new Action(action, i);
152        if (locs[i] != null) {
153          asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
154            replicaAction, actionsByServer, nonceGroup);
155        } else {
156          unknownReplicaActions.add(replicaAction);
157        }
158      }
159    }
160
161    private void addReplicaActionsAgain(Action action,
162      Map<ServerName, MultiAction> actionsByServer) {
163      if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
164        throw new AssertionError("Cannot have default replica here");
165      }
166      HRegionLocation loc = getReplicaLocationOrFail(action);
167      if (loc == null) return;
168      asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), action,
169        actionsByServer, nonceGroup);
170    }
171  }
172
173  /**
174   * Runnable (that can be submitted to thread pool) that submits MultiAction to a single server.
175   * The server call is synchronous, therefore we do it on a thread pool.
176   */
177  final class SingleServerRequestRunnable implements Runnable {
178    private final MultiAction multiAction;
179    private final int numAttempt;
180    private final ServerName server;
181    private final Set<CancellableRegionServerCallable> callsInProgress;
182
183    SingleServerRequestRunnable(MultiAction multiAction, int numAttempt, ServerName server,
184      Set<CancellableRegionServerCallable> callsInProgress) {
185      this.multiAction = multiAction;
186      this.numAttempt = numAttempt;
187      this.server = server;
188      this.callsInProgress = callsInProgress;
189    }
190
191    @Override
192    public void run() {
193      AbstractResponse res = null;
194      CancellableRegionServerCallable callable = currentCallable;
195      try {
196        // setup the callable based on the actions, if we don't have one already from the request
197        if (callable == null) {
198          callable = createCallable(server, tableName, multiAction);
199        }
200        RpcRetryingCaller<AbstractResponse> caller =
201          asyncProcess.createCaller(callable, rpcTimeout);
202        try {
203          if (callsInProgress != null) {
204            callsInProgress.add(callable);
205          }
206          res = caller.callWithoutRetries(callable, operationTimeout);
207          if (res == null) {
208            // Cancelled
209            return;
210          }
211        } catch (OperationTimeoutExceededException e) {
212          // The operation has timed out before executing the actual callable. This may be due to
213          // slow/hotspotted meta or the operation timeout set too low for the number of requests.
214          // Circumventing the usual failure flow ensure the meta cache is not cleared and will not
215          // result in a doomed feedback loop in which the meta continues to be hotspotted.
216          // See HBASE-27487
217          failAll(multiAction, server, numAttempt, e);
218          return;
219        } catch (IOException e) {
220          // The service itself failed . It may be an error coming from the communication
221          // layer, but, as well, a functional error raised by the server.
222          receiveGlobalFailure(multiAction, server, numAttempt, e, true);
223          return;
224        } catch (Throwable t) {
225          // This should not happen. Let's log & retry anyway.
226          LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected."
227            + " Retrying. Server=" + server + ", tableName=" + tableName, t);
228          receiveGlobalFailure(multiAction, server, numAttempt, t, true);
229          return;
230        }
231        if (res.type() == AbstractResponse.ResponseType.MULTI) {
232          // Normal case: we received an answer from the server, and it's not an exception.
233          receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
234        } else {
235          if (results != null) {
236            SingleResponse singleResponse = (SingleResponse) res;
237            updateResult(0, singleResponse.getEntry());
238          }
239          decActionCounter(1);
240        }
241      } catch (Throwable t) {
242        // Something really bad happened. We are on the send thread that will now die.
243        LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t);
244        throw new RuntimeException(t);
245      } finally {
246        asyncProcess.decTaskCounters(multiAction.getRegions(), server);
247        if (callsInProgress != null && callable != null && res != null) {
248          callsInProgress.remove(callable);
249        }
250      }
251    }
252  }
253
254  private final Batch.Callback<CResult> callback;
255  private final BatchErrors errors;
256  private final ConnectionImplementation.ServerErrorTracker errorsByServer;
257  private final ExecutorService pool;
258  private final Set<CancellableRegionServerCallable> callsInProgress;
259
260  private final TableName tableName;
261  private final AtomicLong actionsInProgress = new AtomicLong(-1);
262  /**
263   * The lock controls access to results. It is only held when populating results where there might
264   * be several callers (eventual consistency gets). For other requests, there's one unique call
265   * going on per result index.
266   */
267  private final Object replicaResultLock = new Object();
268  /**
269   * Result array. Null if results are not needed. Otherwise, each index corresponds to the action
270   * index in initial actions submitted. For most request types, has null-s for requests that are
271   * not done, and result/exception for those that are done. For eventual-consistency gets,
272   * initially the same applies; at some point, replica calls might be started, and
273   * ReplicaResultState is put at the corresponding indices. The returning calls check the type to
274   * detect when this is the case. After all calls are done, ReplicaResultState-s are replaced with
275   * results for the user.
276   */
277  private final Object[] results;
278  /**
279   * Indices of replica gets in results. If null, all or no actions are replica-gets.
280   */
281  private final int[] replicaGetIndices;
282  private final boolean hasAnyReplicaGets;
283  private final long nonceGroup;
284  private final CancellableRegionServerCallable currentCallable;
285  private final int operationTimeout;
286  private final int rpcTimeout;
287  private final AsyncProcess asyncProcess;
288  private final Map<String, byte[]> requestAttributes;
289
290  /**
291   * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
292   * used to make logging more clear, we don't actually care why we don't retry.
293   */
294  public enum Retry {
295    YES,
296    NO_LOCATION_PROBLEM,
297    NO_NOT_RETRIABLE,
298    NO_RETRIES_EXHAUSTED,
299    NO_OTHER_SUCCEEDED
300  }
301
302  /**
303   * Sync point for calls to multiple replicas for the same user request (Get). Created and put in
304   * the results array (we assume replica calls require results) when the replica calls are
305   * launched. See results for details of this process. POJO, all fields are public. To modify them,
306   * the object itself is locked.
307   */
308  private static class ReplicaResultState {
309    public ReplicaResultState(int callCount) {
310      this.callCount = callCount;
311    }
312
313    /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
314    int callCount;
315    /**
316     * Errors for which it is not decided whether we will report them to user. If one of the calls
317     * succeeds, we will discard the errors that may have happened in the other calls.
318     */
319    BatchErrors replicaErrors = null;
320
321    @Override
322    public String toString() {
323      return "[call count " + callCount + "; errors " + replicaErrors + "]";
324    }
325  }
326
327  public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup,
328    AsyncProcess asyncProcess) {
329    this.pool = task.getPool();
330    this.callback = task.getCallback();
331    this.nonceGroup = nonceGroup;
332    this.tableName = task.getTableName();
333    this.actionsInProgress.set(actions.size());
334    if (task.getResults() == null) {
335      results = task.getNeedResults() ? new Object[actions.size()] : null;
336    } else {
337      if (task.getResults().length != actions.size()) {
338        throw new AssertionError("results.length");
339      }
340      this.results = task.getResults();
341      for (int i = 0; i != this.results.length; ++i) {
342        results[i] = null;
343      }
344    }
345    List<Integer> replicaGetIndices = null;
346    boolean hasAnyReplicaGets = false;
347    if (results != null) {
348      // Check to see if any requests might require replica calls.
349      // We expect that many requests will consist of all or no multi-replica gets; in such
350      // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
351      // store the list of action indexes for which replica gets are possible, and set
352      // hasAnyReplicaGets to true.
353      boolean hasAnyNonReplicaReqs = false;
354      int posInList = 0;
355      for (Action action : actions) {
356        boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction());
357        if (isReplicaGet) {
358          hasAnyReplicaGets = true;
359          if (hasAnyNonReplicaReqs) { // Mixed case
360            if (replicaGetIndices == null) {
361              replicaGetIndices = new ArrayList<>(actions.size() - 1);
362            }
363            replicaGetIndices.add(posInList);
364          }
365        } else if (!hasAnyNonReplicaReqs) {
366          // The first non-multi-replica request in the action list.
367          hasAnyNonReplicaReqs = true;
368          if (posInList > 0) {
369            // Add all the previous requests to the index lists. We know they are all
370            // replica-gets because this is the first non-multi-replica request in the list.
371            replicaGetIndices = new ArrayList<>(actions.size() - 1);
372            for (int i = 0; i < posInList; ++i) {
373              replicaGetIndices.add(i);
374            }
375          }
376        }
377        ++posInList;
378      }
379    }
380    this.hasAnyReplicaGets = hasAnyReplicaGets;
381    if (replicaGetIndices != null) {
382      this.replicaGetIndices = new int[replicaGetIndices.size()];
383      int i = 0;
384      for (Integer el : replicaGetIndices) {
385        this.replicaGetIndices[i++] = el;
386      }
387    } else {
388      this.replicaGetIndices = null;
389    }
390    this.callsInProgress = !hasAnyReplicaGets
391      ? null
392      : Collections
393        .newSetFromMap(new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
394    this.asyncProcess = asyncProcess;
395    this.errorsByServer = createServerErrorTracker();
396    this.errors = new BatchErrors();
397    this.operationTimeout = task.getOperationTimeout();
398    this.rpcTimeout = task.getRpcTimeout();
399    this.currentCallable = task.getCallable();
400    if (task.getCallable() == null) {
401      tracker = new RetryingTimeTracker().start();
402    }
403    this.requestAttributes = task.getRequestAttributes();
404  }
405
406  protected Set<CancellableRegionServerCallable> getCallsInProgress() {
407    return callsInProgress;
408  }
409
410  SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt,
411    ServerName server, Set<CancellableRegionServerCallable> callsInProgress) {
412    return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
413  }
414
415  /**
416   * Some checked calls send a callable with their own tracker. This method checks the operation
417   * timeout against the appropriate tracker, or returns false if no tracker.
418   */
419  private boolean isOperationTimeoutExceeded() {
420    // return value of 1 is special to mean exceeded, to differentiate from 0
421    // which is no timeout. see implementation of RetryingTimeTracker.getRemainingTime
422    return getRemainingTime() == 1;
423  }
424
425  private long getRemainingTime() {
426    RetryingTimeTracker currentTracker;
427    if (tracker != null) {
428      currentTracker = tracker;
429    } else if (currentCallable != null && currentCallable.getTracker() != null) {
430      currentTracker = currentCallable.getTracker();
431    } else {
432      return 0;
433    }
434
435    // no-op if already started, this is just to ensure it was initialized (usually true)
436    currentTracker.start();
437
438    return currentTracker.getRemainingTime(operationTimeout);
439  }
440
441  /**
442   * Group a list of actions per region servers, and send them.
443   * @param currentActions - the list of row to submit
444   * @param numAttempt     - the current numAttempt (first attempt is 1)
445   */
446  void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
447    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
448
449    boolean isReplica = false;
450    List<Action> unknownReplicaActions = null;
451    for (Action action : currentActions) {
452      if (isOperationTimeoutExceeded()) {
453        String message = numAttempt == 1
454          ? "Operation timeout exceeded during resolution of region locations, "
455            + "prior to executing any actions."
456          : "Operation timeout exceeded during re-resolution of region locations on retry "
457            + (numAttempt - 1) + ".";
458
459        message += " Meta may be slow or operation timeout too short for batch size or retries.";
460        OperationTimeoutExceededException exception =
461          new OperationTimeoutExceededException(message);
462
463        // Clear any actions we already resolved, because none will have been executed yet
464        // We are going to fail all passed actions because there's no way we can execute any
465        // if operation timeout is exceeded.
466        actionsByServer.clear();
467        for (Action actionToFail : currentActions) {
468          manageLocationError(actionToFail, exception);
469        }
470        return;
471      }
472
473      RegionLocations locs = findAllLocationsOrFail(action, true);
474      if (locs == null) continue;
475      boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
476      if (isReplica && !isReplicaAction) {
477        // This is the property of the current implementation, not a requirement.
478        throw new AssertionError("Replica and non-replica actions in the same retry");
479      }
480      isReplica = isReplicaAction;
481      HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
482      if (loc == null || loc.getServerName() == null) {
483        if (isReplica) {
484          if (unknownReplicaActions == null) {
485            unknownReplicaActions = new ArrayList<>(1);
486          }
487          unknownReplicaActions.add(action);
488        } else {
489          // TODO: relies on primary location always being fetched
490          manageLocationError(action, null);
491        }
492      } else {
493        byte[] regionName = loc.getRegionInfo().getRegionName();
494        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer,
495          nonceGroup);
496      }
497    }
498    boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
499    boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
500
501    if (!actionsByServer.isEmpty()) {
502      // If this is a first attempt to group and send, no replicas, we need replica thread.
503      sendMultiAction(actionsByServer, numAttempt,
504        (doStartReplica && !hasUnknown) ? currentActions : null, numAttempt > 1 && !hasUnknown);
505    }
506
507    if (hasUnknown) {
508      actionsByServer = new HashMap<>();
509      for (Action action : unknownReplicaActions) {
510        HRegionLocation loc = getReplicaLocationOrFail(action);
511        if (loc == null) continue;
512        byte[] regionName = loc.getRegionInfo().getRegionName();
513        AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer,
514          nonceGroup);
515      }
516      if (!actionsByServer.isEmpty()) {
517        sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
518      }
519    }
520  }
521
522  private HRegionLocation getReplicaLocationOrFail(Action action) {
523    // We are going to try get location once again. For each action, we'll do it once
524    // from cache, because the previous calls in the loop might populate it.
525    int replicaId = action.getReplicaId();
526    RegionLocations locs = findAllLocationsOrFail(action, true);
527    if (locs == null) return null; // manageError already called
528    HRegionLocation loc = locs.getRegionLocation(replicaId);
529    if (loc == null || loc.getServerName() == null) {
530      locs = findAllLocationsOrFail(action, false);
531      if (locs == null) return null; // manageError already called
532      loc = locs.getRegionLocation(replicaId);
533    }
534    if (loc == null || loc.getServerName() == null) {
535      manageLocationError(action, null);
536      return null;
537    }
538    return loc;
539  }
540
541  private void manageLocationError(Action action, Exception ex) {
542    String msg =
543      "Cannot get replica " + action.getReplicaId() + " location for " + action.getAction();
544    LOG.error(msg);
545    if (ex == null) {
546      ex = new IOException(msg);
547    }
548    manageError(action.getOriginalIndex(), action.getAction(), Retry.NO_LOCATION_PROBLEM, ex, null);
549  }
550
551  private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) {
552    if (action.getAction() == null)
553      throw new IllegalArgumentException("#" + asyncProcess.id + ", row cannot be null");
554    RegionLocations loc = null;
555    try {
556      loc = asyncProcess.connection.locateRegion(tableName, action.getAction().getRow(), useCache,
557        true, action.getReplicaId());
558    } catch (IOException ex) {
559      manageLocationError(action, ex);
560    }
561    return loc;
562  }
563
564  /**
565   * Send a multi action structure to the servers, after a delay depending on the attempt number.
566   * Asynchronous.
567   * @param actionsByServer         the actions structured by regions
568   * @param numAttempt              the attempt number.
569   * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
570   */
571  void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt,
572    List<Action> actionsForReplicaThread, boolean reuseThread) {
573    boolean clearServerCache = true;
574    // Run the last item on the same thread if we are already on a send thread.
575    // We hope most of the time it will be the only item, so we can cut down on threads.
576    int actionsRemaining = actionsByServer.size();
577    // This iteration is by server (the HRegionLocation comparator is by server portion only).
578    for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) {
579      ServerName server = e.getKey();
580      MultiAction multiAction = e.getValue();
581      Collection<? extends Runnable> runnables =
582        getNewMultiActionRunnable(server, multiAction, numAttempt);
583      // make sure we correctly count the number of runnables before we try to reuse the send
584      // thread, in case we had to split the request into different runnables because of backoff
585      if (runnables.size() > actionsRemaining) {
586        actionsRemaining = runnables.size();
587      }
588
589      // run all the runnables
590      // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack
591      // overflow
592      // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth
593      for (Runnable runnable : runnables) {
594        if (
595          (--actionsRemaining == 0) && reuseThread
596            && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0
597        ) {
598          runnable.run();
599        } else {
600          try {
601            pool.execute(runnable);
602          } catch (Throwable t) {
603            if (t instanceof RejectedExecutionException) {
604              // This should never happen. But as the pool is provided by the end user,
605              // let's secure this a little.
606              LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + " Server="
607                + server.getServerName(), t);
608              // Do not update cache if exception is from failing to submit action to thread pool
609              clearServerCache = false;
610            } else {
611              // see #HBASE-14359 for more details
612              LOG.warn("Caught unexpected exception/error: ", t);
613            }
614            asyncProcess.decTaskCounters(multiAction.getRegions(), server);
615            // We're likely to fail again, but this will increment the attempt counter,
616            // so it will finish.
617            receiveGlobalFailure(multiAction, server, numAttempt, t, clearServerCache);
618          }
619        }
620      }
621    }
622
623    if (actionsForReplicaThread != null) {
624      startWaitingForReplicaCalls(actionsForReplicaThread);
625    }
626  }
627
628  @SuppressWarnings("MixedMutabilityReturnType")
629  private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
630    MultiAction multiAction, int numAttempt) {
631    // no stats to manage, just do the standard action
632    if (asyncProcess.connection.getStatisticsTracker() == null) {
633      if (asyncProcess.connection.getConnectionMetrics() != null) {
634        asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
635      }
636      asyncProcess.incTaskCounters(multiAction.getRegions(), server);
637      SingleServerRequestRunnable runnable =
638        createSingleServerRequest(multiAction, numAttempt, server, callsInProgress);
639
640      // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
641      return Collections.singletonList(runnable);
642    }
643
644    // group the actions by the amount of delay
645    Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size());
646
647    // split up the actions
648    for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) {
649      Long backoff = getBackoff(server, e.getKey());
650      DelayingRunner runner = actions.get(backoff);
651      if (runner == null) {
652        actions.put(backoff, new DelayingRunner(backoff, e));
653      } else {
654        runner.add(e);
655      }
656    }
657
658    List<Runnable> toReturn = new ArrayList<>(actions.size());
659    for (DelayingRunner runner : actions.values()) {
660      asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
661      Runnable runnable =
662        createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
663      // use a delay runner only if we need to sleep for some time
664      if (runner.getSleepTime() > 0) {
665        runner.setRunner(runnable);
666        runnable = runner;
667        if (asyncProcess.connection.getConnectionMetrics() != null) {
668          asyncProcess.connection.getConnectionMetrics()
669            .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
670        }
671      } else {
672        if (asyncProcess.connection.getConnectionMetrics() != null) {
673          asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
674        }
675      }
676      // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
677      toReturn.add(runnable);
678
679    }
680    return toReturn;
681  }
682
683  /**
684   * @param server     server location where the target region is hosted
685   * @param regionName name of the region which we are going to write some data
686   * @return the amount of time the client should wait until it submit a request to the specified
687   *         server and region
688   */
689  private Long getBackoff(ServerName server, byte[] regionName) {
690    ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker();
691    ServerStatistics stats = tracker.getStats(server);
692    return asyncProcess.connection.getBackoffPolicy().getBackoffTime(server, regionName, stats);
693  }
694
695  /**
696   * Starts waiting to issue replica calls on a different thread; or issues them immediately.
697   */
698  private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) {
699    long startTime = EnvironmentEdgeManager.currentTime();
700    ReplicaCallIssuingRunnable replicaRunnable =
701      new ReplicaCallIssuingRunnable(actionsForReplicaThread, startTime);
702    if (asyncProcess.primaryCallTimeoutMicroseconds == 0) {
703      // Start replica calls immediately.
704      replicaRunnable.run();
705    } else {
706      // Start the thread that may kick off replica gets.
707      // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
708      try {
709        pool.execute(replicaRunnable);
710      } catch (RejectedExecutionException ree) {
711        LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree);
712      }
713    }
714  }
715
716  /**
717   * Check that we can retry acts accordingly: logs, set the error status.
718   * @param originalIndex the position in the list sent
719   * @param row           the row
720   * @param canRetry      if false, we won't retry whatever the settings.
721   * @param throwable     the throwable, if any (can be null)
722   * @param server        the location, if any (can be null)
723   * @return true if the action can be retried, false otherwise.
724   */
725  Retry manageError(int originalIndex, Row row, Retry canRetry, Throwable throwable,
726    ServerName server) {
727    if (canRetry == Retry.YES && throwable != null && throwable instanceof DoNotRetryIOException) {
728      canRetry = Retry.NO_NOT_RETRIABLE;
729    }
730
731    if (canRetry != Retry.YES) {
732      // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
733      setError(originalIndex, row, throwable, server);
734    } else if (isActionComplete(originalIndex, row)) {
735      canRetry = Retry.NO_OTHER_SUCCEEDED;
736    }
737    return canRetry;
738  }
739
740  /**
741   * Fail all the actions from this multiaction after an OperationTimeoutExceededException
742   * @param actions    the actions still to do from the initial list
743   * @param server     the destination
744   * @param numAttempt the number of attempts so far
745   * @param throwable  the throwable that caused the failure
746   */
747  private void failAll(MultiAction actions, ServerName server, int numAttempt,
748    Throwable throwable) {
749    int failed = 0;
750    for (Map.Entry<byte[], List<Action>> e : actions.actions.entrySet()) {
751      for (Action action : e.getValue()) {
752        setError(action.getOriginalIndex(), action.getAction(), throwable, server);
753        ++failed;
754      }
755    }
756    logNoResubmit(server, numAttempt, actions.size(), throwable, failed, 0);
757  }
758
759  /**
760   * Resubmit all the actions from this multiaction after a failure.
761   * @param rsActions  the actions still to do from the initial list
762   * @param server     the destination
763   * @param numAttempt the number of attempts so far
764   * @param t          the throwable (if any) that caused the resubmit
765   */
766  private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int numAttempt,
767    Throwable t, boolean clearServerCache) {
768    errorsByServer.reportServerError(server);
769    Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
770
771    // Do not update cache if exception is from failing to submit action to thread pool
772    if (clearServerCache) {
773      cleanServerCache(server, t);
774    }
775
776    int failed = 0;
777    int stopped = 0;
778    List<Action> toReplay = new ArrayList<>();
779    for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
780      byte[] regionName = e.getKey();
781      byte[] row = e.getValue().get(0).getAction().getRow();
782      // Do not use the exception for updating cache because it might be coming from
783      // any of the regions in the MultiAction and do not update cache if exception is
784      // from failing to submit action to thread pool
785      if (clearServerCache) {
786        updateCachedLocations(server, regionName, row,
787          ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
788      }
789      for (Action action : e.getValue()) {
790        Retry retry =
791          manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server);
792        if (retry == Retry.YES) {
793          toReplay.add(action);
794        } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
795          ++stopped;
796        } else {
797          ++failed;
798        }
799      }
800    }
801
802    if (toReplay.isEmpty()) {
803      logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
804    } else {
805      resubmit(server, toReplay, numAttempt, rsActions.size(), t);
806    }
807  }
808
809  /**
810   * Log as much info as possible, and, if there is something to replay, submit it again after a
811   * back off sleep.
812   */
813  private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttempt,
814    int failureCount, Throwable throwable) {
815    // We have something to replay. We're going to sleep a little before.
816
817    // We have two contradicting needs here:
818    // 1) We want to get the new location after having slept, as it may change.
819    // 2) We want to take into account the location when calculating the sleep time.
820    // 3) If all this is just because the response needed to be chunked try again FAST.
821    // It should be possible to have some heuristics to take the right decision. Short term,
822    // we go for one.
823    boolean retryImmediately = throwable instanceof RetryImmediatelyException;
824    int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
825    long backOffTime;
826    if (retryImmediately) {
827      backOffTime = 0;
828    } else if (throwable instanceof RpcThrottlingException) {
829      backOffTime = ((RpcThrottlingException) throwable).getWaitInterval();
830    } else if (HBaseServerException.isServerOverloaded(throwable)) {
831      // Give a special check when encountering an exception indicating the server is overloaded.
832      // see #HBASE-17114 and HBASE-26807
833      backOffTime = errorsByServer.calculateBackoffTime(oldServer,
834        asyncProcess.connectionConfiguration.getPauseMillisForServerOverloaded());
835    } else {
836      backOffTime = errorsByServer.calculateBackoffTime(oldServer,
837        asyncProcess.connectionConfiguration.getPauseMillis());
838    }
839
840    MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics();
841    if (metrics != null && HBaseServerException.isServerOverloaded(throwable)) {
842      metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS);
843    }
844
845    if (numAttempt > asyncProcess.startLogErrorsCnt) {
846      // We use this value to have some logs when we have multiple failures, but not too many
847      // logs, as errors are to be expected when a region moves, splits and so on
848      LOG.info(createLog(numAttempt, failureCount, toReplay.size(), oldServer, throwable,
849        backOffTime, true, null, -1, -1));
850    }
851
852    long remainingTime = getRemainingTime();
853    // 1 is a special value meaning exceeded and 0 means no timeout.
854    // throw if timeout already exceeded, or if backoff is larger than non-zero remaining
855    if (remainingTime == 1 || (remainingTime > 0 && backOffTime > remainingTime)) {
856      OperationTimeoutExceededException ex = new OperationTimeoutExceededException(
857        "Backoff time of " + backOffTime + "ms would exceed operation timeout", throwable);
858      for (Action actionToFail : toReplay) {
859        manageError(actionToFail.getOriginalIndex(), actionToFail.getAction(),
860          Retry.NO_NOT_RETRIABLE, ex, null);
861      }
862      return;
863    }
864
865    try {
866      if (backOffTime > 0) {
867        Thread.sleep(backOffTime);
868      }
869    } catch (InterruptedException e) {
870      LOG.warn(
871        "#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
872      Thread.currentThread().interrupt();
873      return;
874    }
875
876    groupAndSendMultiAction(toReplay, nextAttemptNumber);
877  }
878
879  private void logNoResubmit(ServerName oldServer, int numAttempt, int failureCount,
880    Throwable throwable, int failed, int stopped) {
881    if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) {
882      @SuppressWarnings("JavaUtilDate")
883      String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
884      String logMessage = createLog(numAttempt, failureCount, 0, oldServer, throwable, -1, false,
885        timeStr, failed, stopped);
886      if (failed != 0) {
887        // Only log final failures as warning
888        LOG.warn(logMessage);
889      } else {
890        LOG.info(logMessage);
891      }
892    }
893  }
894
895  /**
896   * Called when we receive the result of a server query.
897   * @param multiAction - the multiAction we sent
898   * @param server      - the location. It's used as a server name.
899   * @param responses   - the response, if any
900   * @param numAttempt  - the attempt
901   */
902  private void receiveMultiAction(MultiAction multiAction, ServerName server,
903    MultiResponse responses, int numAttempt) {
904    assert responses != null;
905    updateStats(server, responses);
906    // Success or partial success
907    // Analyze detailed results. We can still have individual failures to be redo.
908    // two specific throwables are managed:
909    // - DoNotRetryIOException: we continue to retry for other actions
910    // - RegionMovedException: we update the cache with the new region location
911    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
912    List<Action> toReplay = new ArrayList<>();
913    Throwable lastException = null;
914    int failureCount = 0;
915    int failed = 0;
916    int stopped = 0;
917    Retry retry = null;
918    // Go by original action.
919    for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
920      byte[] regionName = regionEntry.getKey();
921
922      Throwable regionException = responses.getExceptions().get(regionName);
923      if (regionException != null) {
924        cleanServerCache(server, regionException);
925      }
926
927      Map<Integer, Object> regionResults =
928        results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
929      boolean regionFailureRegistered = false;
930      for (Action sentAction : regionEntry.getValue()) {
931        Object result = regionResults.get(sentAction.getOriginalIndex());
932        if (result == null) {
933          if (regionException == null) {
934            LOG.error("Server sent us neither results nor exceptions for "
935              + Bytes.toStringBinary(regionName) + ", numAttempt:" + numAttempt);
936            regionException = new RuntimeException("Invalid response");
937          }
938          // If the row operation encounters the region-lever error, the exception of action may be
939          // null.
940          result = regionException;
941        }
942        // Failure: retry if it's make sense else update the errors lists
943        if (result instanceof Throwable) {
944          Throwable actionException = (Throwable) result;
945          Row row = sentAction.getAction();
946          lastException = regionException != null
947            ? regionException
948            : ClientExceptionsUtil.findException(actionException);
949          // Register corresponding failures once per server/once per region.
950          if (!regionFailureRegistered) {
951            regionFailureRegistered = true;
952            updateCachedLocations(server, regionName, row.getRow(), actionException);
953          }
954          if (retry == null) {
955            errorsByServer.reportServerError(server);
956            // We determine canRetry only once for all calls, after reporting server failure.
957            retry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
958          }
959          ++failureCount;
960          switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException, server)) {
961            case YES:
962              toReplay.add(sentAction);
963              break;
964            case NO_OTHER_SUCCEEDED:
965              ++stopped;
966              break;
967            default:
968              ++failed;
969              break;
970          }
971        } else {
972          invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
973          setResult(sentAction, result);
974        }
975      }
976    }
977    if (toReplay.isEmpty()) {
978      logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
979    } else {
980      resubmit(server, toReplay, numAttempt, failureCount, lastException);
981    }
982  }
983
984  private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
985    Throwable rowException) {
986    if (tableName == null) {
987      return;
988    }
989    try {
990      asyncProcess.connection.updateCachedLocations(tableName, regionName, row, rowException,
991        server);
992    } catch (Throwable ex) {
993      // That should never happen, but if it did, we want to make sure
994      // we still process errors
995      LOG.error("Couldn't update cached region locations: " + ex);
996    }
997  }
998
999  private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
1000    if (callback != null) {
1001      try {
1002        // noinspection unchecked
1003        // TODO: would callback expect a replica region name if it gets one?
1004        this.callback.update(regionName, row, result);
1005      } catch (Throwable t) {
1006        LOG.error(
1007          "User callback threw an exception for " + Bytes.toStringBinary(regionName) + ", ignoring",
1008          t);
1009      }
1010    }
1011  }
1012
1013  private void cleanServerCache(ServerName server, Throwable regionException) {
1014    if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) {
1015      // We want to make sure to clear the cache in case there were location-related exceptions.
1016      // We don't to clear the cache for every possible exception that comes through, however.
1017      MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics();
1018      if (metrics != null) {
1019        metrics.incrCacheDroppingExceptions(regionException);
1020      }
1021      asyncProcess.connection.clearCaches(server);
1022    }
1023  }
1024
1025  protected void updateStats(ServerName server, MultiResponse resp) {
1026    ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
1027      Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp);
1028  }
1029
1030  private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1031    Throwable error, long backOffTime, boolean willRetry, String startTime, int failed,
1032    int stopped) {
1033    StringBuilder sb = new StringBuilder();
1034    sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName)
1035      .append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries)
1036      .append(", ");
1037
1038    if (failureCount > 0 || error != null) {
1039      sb.append("failureCount=").append(failureCount).append("ops").append(", last exception=")
1040        .append(error);
1041    } else {
1042      sb.append("succeeded");
1043    }
1044
1045    sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1046
1047    if (willRetry) {
1048      sb.append(", retrying after=").append(backOffTime).append("ms")
1049        .append(", operationsToReplay=").append(replaySize);
1050    } else if (failureCount > 0) {
1051      if (stopped > 0) {
1052        sb.append("; NOT retrying, stopped=").append(stopped)
1053          .append(" because successful operation on other replica");
1054      }
1055      if (failed > 0) {
1056        sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!");
1057      }
1058    }
1059
1060    return sb.toString();
1061  }
1062
1063  /**
1064   * Sets the non-error result from a particular action.
1065   * @param action Action (request) that the server responded to.
1066   * @param result The result.
1067   */
1068  private void setResult(Action action, Object result) {
1069    if (result == null) {
1070      throw new RuntimeException("Result cannot be null");
1071    }
1072    boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1073    int index = action.getOriginalIndex();
1074    if (results == null) {
1075      decActionCounter(index);
1076      return; // Simple case, no replica requests.
1077    }
1078    ReplicaResultState state =
1079      trySetResultSimple(index, action.getAction(), false, result, null, isStale);
1080    if (state == null) {
1081      return; // Simple case, no replica requests.
1082    }
1083    // At this point we know that state is set to replica tracking class.
1084    // It could be that someone else is also looking at it; however, we know there can
1085    // only be one state object, and only one thread can set callCount to 0. Other threads
1086    // will either see state with callCount 0 after locking it; or will not see state at all
1087    // we will replace it with the result.
1088    synchronized (state) {
1089      if (state.callCount == 0) {
1090        return; // someone already set the result
1091      }
1092      state.callCount = 0;
1093    }
1094    synchronized (replicaResultLock) {
1095      if (results[index] != state) {
1096        throw new AssertionError("We set the callCount but someone else replaced the result");
1097      }
1098      updateResult(index, result);
1099    }
1100
1101    decActionCounter(index);
1102  }
1103
1104  /**
1105   * Sets the error from a particular action.
1106   * @param index     Original action index.
1107   * @param row       Original request.
1108   * @param throwable The resulting error.
1109   * @param server    The source server.
1110   */
1111  private void setError(int index, Row row, Throwable throwable, ServerName server) {
1112    if (results == null) {
1113      // Note that we currently cannot have replica requests with null results. So it shouldn't
1114      // happen that multiple replica calls will call dAC for same actions with results == null.
1115      // Only one call per action should be present in this case.
1116      errors.add(throwable, row, server);
1117      decActionCounter(index);
1118      return; // Simple case, no replica requests.
1119    }
1120    ReplicaResultState state = trySetResultSimple(index, row, true, throwable, server, false);
1121    if (state == null) {
1122      return; // Simple case, no replica requests.
1123    }
1124    BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1125    boolean isActionDone = false;
1126    synchronized (state) {
1127      switch (state.callCount) {
1128        case 0:
1129          return; // someone already set the result
1130        case 1: { // All calls failed, we are the last error.
1131          target = errors;
1132          isActionDone = true;
1133          break;
1134        }
1135        default: {
1136          assert state.callCount > 1;
1137          if (state.replicaErrors == null) {
1138            state.replicaErrors = new BatchErrors();
1139          }
1140          target = state.replicaErrors;
1141          break;
1142        }
1143      }
1144      --state.callCount;
1145    }
1146    target.add(throwable, row, server);
1147    if (isActionDone) {
1148      if (state.replicaErrors != null) { // last call, no need to lock
1149        errors.merge(state.replicaErrors);
1150      }
1151      // See setResult for explanations.
1152      synchronized (replicaResultLock) {
1153        if (results[index] != state) {
1154          throw new AssertionError("We set the callCount but someone else replaced the result");
1155        }
1156        updateResult(index, throwable);
1157      }
1158      decActionCounter(index);
1159    }
1160  }
1161
1162  /**
1163   * Checks if the action is complete; used on error to prevent needless retries. Does not
1164   * synchronize, assuming element index/field accesses are atomic. This is an opportunistic
1165   * optimization check, doesn't have to be strict.
1166   * @param index Original action index.
1167   * @param row   Original request.
1168   */
1169  private boolean isActionComplete(int index, Row row) {
1170    if (!AsyncProcess.isReplicaGet(row)) return false;
1171    Object resObj = results[index];
1172    return (resObj != null)
1173      && (!(resObj instanceof ReplicaResultState) || ((ReplicaResultState) resObj).callCount == 0);
1174  }
1175
1176  /**
1177   * Tries to set the result or error for a particular action as if there were no replica calls.
1178   * @return null if successful; replica state if there were in fact replica calls.
1179   */
1180  private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, Object result,
1181    ServerName server, boolean isFromReplica) {
1182    Object resObj = null;
1183    if (!AsyncProcess.isReplicaGet(row)) {
1184      if (isFromReplica) {
1185        throw new AssertionError("Unexpected stale result for " + row);
1186      }
1187      updateResult(index, result);
1188    } else {
1189      synchronized (replicaResultLock) {
1190        resObj = results[index];
1191        if (resObj == null) {
1192          if (isFromReplica) {
1193            throw new AssertionError("Unexpected stale result for " + row);
1194          }
1195          updateResult(index, result);
1196        }
1197      }
1198    }
1199
1200    ReplicaResultState rrs =
1201      (resObj instanceof ReplicaResultState) ? (ReplicaResultState) resObj : null;
1202    if (rrs == null && isError) {
1203      // The resObj is not replica state (null or already set).
1204      errors.add((Throwable) result, row, server);
1205    }
1206
1207    if (resObj == null) {
1208      // resObj is null - no replica calls were made.
1209      decActionCounter(index);
1210      return null;
1211    }
1212    return rrs;
1213  }
1214
1215  private void decActionCounter(int index) {
1216    long actionsRemaining = actionsInProgress.decrementAndGet();
1217    if (actionsRemaining < 0) {
1218      String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1219      throw new AssertionError(error);
1220    } else if (actionsRemaining == 0) {
1221      synchronized (actionsInProgress) {
1222        actionsInProgress.notifyAll();
1223      }
1224    }
1225  }
1226
1227  private String buildDetailedErrorMsg(String string, int index) {
1228    StringBuilder error = new StringBuilder(128);
1229    error.append(string).append("; called for ").append(index).append(", actionsInProgress ")
1230      .append(actionsInProgress.get()).append("; replica gets: ");
1231    if (replicaGetIndices != null) {
1232      for (int i = 0; i < replicaGetIndices.length; ++i) {
1233        error.append(replicaGetIndices[i]).append(", ");
1234      }
1235    } else {
1236      error.append(hasAnyReplicaGets ? "all" : "none");
1237    }
1238    error.append("; results ");
1239    if (results != null) {
1240      for (int i = 0; i < results.length; ++i) {
1241        Object o = results[i];
1242        error.append(((o == null) ? "null" : o.toString())).append(", ");
1243      }
1244    }
1245    return error.toString();
1246  }
1247
1248  @Override
1249  public void waitUntilDone() throws InterruptedIOException {
1250    try {
1251      if (this.operationTimeout > 0) {
1252        // the worker thread maybe over by some exception without decrement the actionsInProgress,
1253        // then the guarantee of operationTimeout will be broken, so we should set cutoff to avoid
1254        // stuck here forever
1255        long cutoff = (EnvironmentEdgeManager.currentTime() + this.operationTimeout) * 1000L;
1256        if (!waitUntilDone(cutoff)) {
1257          throw new SocketTimeoutException("time out before the actionsInProgress changed to zero");
1258        }
1259      } else {
1260        waitUntilDone(Long.MAX_VALUE);
1261      }
1262    } catch (InterruptedException iex) {
1263      throw new InterruptedIOException(iex.getMessage());
1264    } finally {
1265      if (callsInProgress != null) {
1266        for (CancellableRegionServerCallable clb : callsInProgress) {
1267          clb.cancel();
1268        }
1269      }
1270    }
1271  }
1272
1273  private boolean waitUntilDone(long cutoff) throws InterruptedException {
1274    boolean hasWait = cutoff != Long.MAX_VALUE;
1275    long lastLog = EnvironmentEdgeManager.currentTime();
1276    long currentInProgress;
1277    while (0 != (currentInProgress = actionsInProgress.get())) {
1278      long now = EnvironmentEdgeManager.currentTime();
1279      if (hasWait && (now * 1000L) > cutoff) {
1280        return false;
1281      }
1282      if (!hasWait) { // Only log if wait is infinite.
1283        if (now > lastLog + 10000) {
1284          lastLog = now;
1285          LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
1286            + "  actions to finish on table: " + tableName);
1287        }
1288      }
1289      synchronized (actionsInProgress) {
1290        if (actionsInProgress.get() == 0) break;
1291        if (!hasWait) {
1292          actionsInProgress.wait(10);
1293        } else {
1294          long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1295          TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1296        }
1297      }
1298    }
1299    return true;
1300  }
1301
1302  @Override
1303  public boolean hasError() {
1304    return errors.hasErrors();
1305  }
1306
1307  @Override
1308  public List<? extends Row> getFailedOperations() {
1309    return errors.actions;
1310  }
1311
1312  @Override
1313  public RetriesExhaustedWithDetailsException getErrors() {
1314    return errors.makeException(asyncProcess.logBatchErrorDetails);
1315  }
1316
1317  @Override
1318  public Object[] getResults() throws InterruptedIOException {
1319    waitUntilDone();
1320    return results;
1321  }
1322
1323  /**
1324   * Creates the server error tracker to use inside process. Currently, to preserve the main
1325   * assumption about current retries, and to work well with the retry-limit-based calculation, the
1326   * calculation is local per Process object. We may benefit from connection-wide tracking of server
1327   * errors.
1328   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1329   */
1330  private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
1331    return new ConnectionImplementation.ServerErrorTracker(asyncProcess.serverTrackerTimeout,
1332      asyncProcess.numTries);
1333  }
1334
1335  /**
1336   * Create a callable. Isolated to be easily overridden in the tests.
1337   */
1338  private MultiServerCallable createCallable(final ServerName server, TableName tableName,
1339    final MultiAction multi) {
1340    return new MultiServerCallable(asyncProcess.connection, tableName, server, multi,
1341      asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority(),
1342      requestAttributes);
1343  }
1344
1345  private void updateResult(int index, Object result) {
1346    Object current = results[index];
1347    if (current != null) {
1348      if (LOG.isDebugEnabled()) {
1349        LOG.debug("The result is assigned repeatedly! current:" + current + ", new:" + result);
1350      }
1351    }
1352    results[index] = result;
1353  }
1354
1355  long getNumberOfActionsInProgress() {
1356    return actionsInProgress.get();
1357  }
1358}