001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
024import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashMap;
032import java.util.IdentityHashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.OptionalLong;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ConcurrentSkipListMap;
042import java.util.concurrent.TimeUnit;
043import java.util.function.Supplier;
044import java.util.stream.Collectors;
045import java.util.stream.Stream;
046import org.apache.commons.lang3.mutable.MutableBoolean;
047import org.apache.hadoop.hbase.CellScannable;
048import org.apache.hadoop.hbase.DoNotRetryIOException;
049import org.apache.hadoop.hbase.HBaseServerException;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.HRegionLocation;
052import org.apache.hadoop.hbase.RetryImmediatelyException;
053import org.apache.hadoop.hbase.ServerName;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
056import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
057import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
058import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
059import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
060import org.apache.hadoop.hbase.ipc.HBaseRpcController;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.io.netty.util.Timer;
068
069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
073
074/**
075 * Retry caller for batch.
076 * <p>
077 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
078 * other single operations
079 * <p>
080 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
081 * implementation, we will record a {@code tries} parameter for each operation group, and if it is
082 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
083 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
084 * the depth of the tree.
085 */
086@InterfaceAudience.Private
087class AsyncBatchRpcRetryingCaller<T> {
088
089  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
090
091  private final Timer retryTimer;
092
093  private final AsyncConnectionImpl conn;
094
095  private final TableName tableName;
096
097  private final List<Action> actions;
098
099  private final List<CompletableFuture<T>> futures;
100
101  private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
102
103  private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
104
105  private final int maxAttempts;
106
107  private final long operationTimeoutNs;
108
109  private final long rpcTimeoutNs;
110
111  private final int startLogErrorsCnt;
112
113  private final long startNs;
114
115  private final HBaseServerExceptionPauseManager pauseManager;
116
117  private final Map<String, byte[]> requestAttributes;
118
119  // we can not use HRegionLocation as the map key because the hashCode and equals method of
120  // HRegionLocation only consider serverName.
121  private static final class RegionRequest {
122
123    public final HRegionLocation loc;
124
125    public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
126
127    public RegionRequest(HRegionLocation loc) {
128      this.loc = loc;
129    }
130  }
131
132  private static final class ServerRequest {
133
134    public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
135      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
136
137    public void addAction(HRegionLocation loc, Action action) {
138      computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
139        () -> new RegionRequest(loc)).actions.add(action);
140    }
141
142    public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
143      actionsByRegion.put(regionName, regionReq);
144    }
145
146    public int getPriority() {
147      return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
148        .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
149    }
150  }
151
152  public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
153    TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded,
154    int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
155    Map<String, byte[]> requestAttributes) {
156    this.retryTimer = retryTimer;
157    this.conn = conn;
158    this.tableName = tableName;
159    this.maxAttempts = maxAttempts;
160    this.operationTimeoutNs = operationTimeoutNs;
161    this.rpcTimeoutNs = rpcTimeoutNs;
162    this.startLogErrorsCnt = startLogErrorsCnt;
163    this.actions = new ArrayList<>(actions.size());
164    this.futures = new ArrayList<>(actions.size());
165    this.action2Future = new IdentityHashMap<>(actions.size());
166    this.pauseManager =
167      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
168    for (int i = 0, n = actions.size(); i < n; i++) {
169      Row rawAction = actions.get(i);
170      Action action;
171      if (rawAction instanceof OperationWithAttributes) {
172        action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority());
173      } else {
174        action = new Action(rawAction, i);
175      }
176      if (hasIncrementOrAppend(rawAction)) {
177        action.setNonce(conn.getNonceGenerator().newNonce());
178      }
179      this.actions.add(action);
180      CompletableFuture<T> future = new CompletableFuture<>();
181      futures.add(future);
182      action2Future.put(action, future);
183    }
184    this.action2Errors = new IdentityHashMap<>();
185    this.startNs = System.nanoTime();
186    this.requestAttributes = requestAttributes;
187  }
188
189  private static boolean hasIncrementOrAppend(Row action) {
190    if (action instanceof Append || action instanceof Increment) {
191      return true;
192    } else if (action instanceof RowMutations) {
193      return hasIncrementOrAppend((RowMutations) action);
194    } else if (action instanceof CheckAndMutate) {
195      return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
196    }
197    return false;
198  }
199
200  private static boolean hasIncrementOrAppend(RowMutations mutations) {
201    for (Mutation mutation : mutations.getMutations()) {
202      if (mutation instanceof Append || mutation instanceof Increment) {
203        return true;
204      }
205    }
206    return false;
207  }
208
209  private List<ThrowableWithExtraContext> removeErrors(Action action) {
210    synchronized (action2Errors) {
211      return action2Errors.remove(action);
212    }
213  }
214
215  private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
216    Throwable error, ServerName serverName) {
217    if (tries > startLogErrorsCnt) {
218      String regions =
219        regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
220          .collect(Collectors.joining(",", "[", "]"));
221      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
222        + " failed, tries=" + tries, error);
223    }
224  }
225
226  private String getExtraContextForError(ServerName serverName) {
227    return serverName != null ? serverName.getServerName() : "";
228  }
229
230  private void addError(Action action, Throwable error, ServerName serverName) {
231    List<ThrowableWithExtraContext> errors;
232    synchronized (action2Errors) {
233      errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
234    }
235    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
236      getExtraContextForError(serverName)));
237  }
238
239  private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
240    actions.forEach(action -> addError(action, error, serverName));
241  }
242
243  private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
244    CompletableFuture<T> future = action2Future.get(action);
245    if (future.isDone()) {
246      return;
247    }
248    ThrowableWithExtraContext errorWithCtx =
249      new ThrowableWithExtraContext(error, currentTime, extras);
250    List<ThrowableWithExtraContext> errors = removeErrors(action);
251    if (errors == null) {
252      errors = Collections.singletonList(errorWithCtx);
253    } else {
254      errors.add(errorWithCtx);
255    }
256    future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
257  }
258
259  private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
260    long currentTime = EnvironmentEdgeManager.currentTime();
261    String extras = getExtraContextForError(serverName);
262    actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
263  }
264
265  private void failAll(Stream<Action> actions, int tries) {
266    actions.forEach(action -> {
267      CompletableFuture<T> future = action2Future.get(action);
268      if (future.isDone()) {
269        return;
270      }
271      future.completeExceptionally(new RetriesExhaustedException(tries,
272        Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
273    });
274  }
275
276  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
277    List<CellScannable> cells, Map<Integer, Integer> indexMap) throws IOException {
278    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
279    ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
280    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
281    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
282    for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
283      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
284      // multiRequestBuilder will be populated with region actions.
285      // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the
286      // action list.
287      RequestConverter.buildNoDataRegionActions(entry.getKey(),
288        entry.getValue().actions.stream()
289          .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
290          .collect(Collectors.toList()),
291        cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
292        indexMap);
293    }
294    return multiRequestBuilder.build();
295  }
296
297  @SuppressWarnings("unchecked")
298  private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
299    RegionResult regionResult, List<Action> failedActions, Throwable regionException,
300    MutableBoolean retryImmediately) {
301    Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
302    if (result == null) {
303      LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
304        + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
305        + regionReq.loc.getRegion().getRegionNameAsString());
306      addError(action, new RuntimeException("Invalid response"), serverName);
307      failedActions.add(action);
308    } else if (result instanceof Throwable) {
309      Throwable error = translateException((Throwable) result);
310      logException(tries, () -> Stream.of(regionReq), error, serverName);
311      conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
312      if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
313        failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
314          getExtraContextForError(serverName));
315      } else {
316        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
317          retryImmediately.setTrue();
318        }
319        failedActions.add(action);
320      }
321    } else {
322      action2Future.get(action).complete((T) result);
323    }
324  }
325
326  private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
327    ServerName serverName, MultiResponse resp) {
328    ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
329      serverName, resp);
330    List<Action> failedActions = new ArrayList<>();
331    MutableBoolean retryImmediately = new MutableBoolean(false);
332    actionsByRegion.forEach((rn, regionReq) -> {
333      RegionResult regionResult = resp.getResults().get(rn);
334      Throwable regionException = resp.getException(rn);
335      if (regionResult != null) {
336        regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
337          regionResult, failedActions, regionException, retryImmediately));
338      } else {
339        Throwable error;
340        if (regionException == null) {
341          LOG.error("Server sent us neither results nor exceptions for {}",
342            Bytes.toStringBinary(rn));
343          error = new RuntimeException("Invalid response");
344        } else {
345          error = translateException(regionException);
346        }
347        logException(tries, () -> Stream.of(regionReq), error, serverName);
348        conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
349        if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
350          failAll(regionReq.actions.stream(), tries, error, serverName);
351          return;
352        }
353        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
354          retryImmediately.setTrue();
355        }
356        addError(regionReq.actions, error, serverName);
357        failedActions.addAll(regionReq.actions);
358      }
359    });
360    if (!failedActions.isEmpty()) {
361      tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null);
362    }
363  }
364
365  private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
366    long remainingNs;
367    if (operationTimeoutNs > 0) {
368      remainingNs = pauseManager.remainingTimeNs(startNs);
369      if (remainingNs <= 0) {
370        failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
371          tries);
372        return;
373      }
374    } else {
375      remainingNs = Long.MAX_VALUE;
376    }
377    ClientService.Interface stub;
378    try {
379      stub = conn.getRegionServerStub(serverName);
380    } catch (IOException e) {
381      onError(serverReq.actionsByRegion, tries, e, serverName);
382      return;
383    }
384    ClientProtos.MultiRequest req;
385    List<CellScannable> cells = new ArrayList<>();
386    // Map from a created RegionAction to the original index for a RowMutations within
387    // the original list of actions. This will be used to process the results when there
388    // is RowMutations/CheckAndMutate in the action list.
389    Map<Integer, Integer> indexMap = new HashMap<>();
390    try {
391      req = buildReq(serverReq.actionsByRegion, cells, indexMap);
392    } catch (IOException e) {
393      onError(serverReq.actionsByRegion, tries, e, serverName);
394      return;
395    }
396    HBaseRpcController controller = conn.rpcControllerFactory.newController();
397    resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
398      calcPriority(serverReq.getPriority(), tableName), tableName);
399    controller.setRequestAttributes(requestAttributes);
400    if (!cells.isEmpty()) {
401      controller.setCellScanner(createCellScanner(cells));
402    }
403    stub.multi(controller, req, resp -> {
404      if (controller.failed()) {
405        onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName);
406      } else {
407        try {
408          onComplete(serverReq.actionsByRegion, tries, serverName,
409            ResponseConverter.getResults(req, indexMap, resp, controller.cellScanner()));
410        } catch (Exception e) {
411          onError(serverReq.actionsByRegion, tries, e, serverName);
412          return;
413        }
414      }
415    });
416  }
417
418  // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
419  // based on the load of the region server and the region.
420  private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
421    Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
422    Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
423    if (!optStats.isPresent()) {
424      actionsByServer.forEach((serverName, serverReq) -> {
425        metrics.ifPresent(MetricsConnection::incrNormalRunners);
426        sendToServer(serverName, serverReq, tries);
427      });
428      return;
429    }
430    ServerStatisticTracker stats = optStats.get();
431    ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
432    actionsByServer.forEach((serverName, serverReq) -> {
433      ServerStatistics serverStats = stats.getStats(serverName);
434      Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
435      serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
436        long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);
437        groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
438          .setRegionRequest(regionName, regionReq);
439      });
440      groupByBackoff.forEach((backoff, sr) -> {
441        if (backoff > 0) {
442          metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff));
443          retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff,
444            TimeUnit.MILLISECONDS);
445        } else {
446          metrics.ifPresent(MetricsConnection::incrNormalRunners);
447          sendToServer(serverName, sr, tries);
448        }
449      });
450    });
451  }
452
453  private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
454    ServerName serverName) {
455    Throwable error = translateException(t);
456    logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
457    actionsByRegion.forEach(
458      (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
459    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
460      failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
461        serverName);
462      return;
463    }
464    List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
465      .collect(Collectors.toList());
466    addError(copiedActions, error, serverName);
467    tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error);
468  }
469
470  private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
471    Throwable error) {
472    if (immediately) {
473      groupAndSend(actions, tries);
474      return;
475    }
476    OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
477    if (!maybePauseNsToUse.isPresent()) {
478      failAll(actions, tries);
479      return;
480    }
481    long delayNs = maybePauseNsToUse.getAsLong();
482    if (HBaseServerException.isServerOverloaded(error)) {
483      Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
484      metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
485    }
486    retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
487  }
488
489  private void groupAndSend(Stream<Action> actions, int tries) {
490    long locateTimeoutNs;
491    if (operationTimeoutNs > 0) {
492      locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
493      if (locateTimeoutNs <= 0) {
494        failAll(actions, tries);
495        return;
496      }
497    } else {
498      locateTimeoutNs = -1L;
499    }
500    ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
501    ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
502    addListener(CompletableFuture.allOf(actions
503      .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
504        RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
505          if (error != null) {
506            error = unwrapCompletionException(translateException(error));
507            if (error instanceof DoNotRetryIOException) {
508              failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
509              return;
510            }
511            addError(action, error, null);
512            locateFailed.add(action);
513          } else {
514            computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
515              action);
516          }
517        }))
518      .toArray(CompletableFuture[]::new)), (v, r) -> {
519        if (!actionsByServer.isEmpty()) {
520          sendOrDelay(actionsByServer, tries);
521        }
522        if (!locateFailed.isEmpty()) {
523          tryResubmit(locateFailed.stream(), tries, false, null);
524        }
525      });
526  }
527
528  public List<CompletableFuture<T>> call() {
529    groupAndSend(actions.stream(), 1);
530    return futures;
531  }
532}