001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.lang.reflect.UndeclaredThrowableException;
025import java.net.SocketTimeoutException;
026import java.time.Instant;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.HBaseServerException;
033import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
034import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.ExceptionUtil;
037import org.apache.hadoop.ipc.RemoteException;
038import org.apache.hadoop.util.StringUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
044
045/**
046 * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client threadlocal outstanding timeouts
047 * as so we don't persist too much. Dynamic rather than static so can set the generic appropriately.
048 * This object has a state. It should not be used by in parallel by different threads. Reusing it is
049 * possible however, even between multiple threads. However, the user will have to manage the
050 * synchronization on its side: there is no synchronization inside the class.
051 */
052@InterfaceAudience.Private
053public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
054  // LOG is being used in TestMultiRowRangeFilter, hence leaving it public
055  public static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerImpl.class);
056
057  /** How many retries are allowed before we start to log */
058  private final int startLogErrorsCnt;
059
060  private final long pause;
061  private final long pauseForServerOverloaded;
062  private final int maxAttempts;// how many times to try
063  private final int rpcTimeout;// timeout for each rpc request
064  private final AtomicBoolean cancelled = new AtomicBoolean(false);
065  private final RetryingCallerInterceptor interceptor;
066  private final RetryingCallerInterceptorContext context;
067  private final RetryingTimeTracker tracker;
068  private final MetricsConnection metrics;
069
070  public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
071    RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout,
072    MetricsConnection metricsConnection) {
073    this.pause = pause;
074    this.pauseForServerOverloaded = pauseForServerOverloaded;
075    this.maxAttempts = retries2Attempts(retries);
076    this.interceptor = interceptor;
077    context = interceptor.createEmptyContext();
078    this.startLogErrorsCnt = startLogErrorsCnt;
079    this.tracker = new RetryingTimeTracker();
080    this.rpcTimeout = rpcTimeout;
081    this.metrics = metricsConnection;
082  }
083
084  @Override
085  public void cancel() {
086    cancelled.set(true);
087    synchronized (cancelled) {
088      cancelled.notifyAll();
089    }
090  }
091
092  @Override
093  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
094    throws IOException, RuntimeException {
095    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = new ArrayList<>();
096    tracker.start();
097    context.clear();
098    for (int tries = 0;; tries++) {
099      long expectedSleep;
100      try {
101        // bad cache entries are cleared in the call to RetryingCallable#throwable() in catch block
102        callable.prepare(tries != 0);
103        interceptor.intercept(context.prepare(callable, tries));
104        return callable.call(getTimeout(callTimeout));
105      } catch (PreemptiveFastFailException e) {
106        throw e;
107      } catch (Throwable t) {
108        ExceptionUtil.rethrowIfInterrupt(t);
109        Throwable cause = t.getCause();
110        if (cause instanceof DoNotRetryIOException) {
111          // Fail fast
112          throw (DoNotRetryIOException) cause;
113        }
114        // translateException throws exception when should not retry: i.e. when request is bad.
115        interceptor.handleFailure(context, t);
116        t = translateException(t);
117
118        if (tries > startLogErrorsCnt) {
119          if (LOG.isInfoEnabled()) {
120            StringBuilder builder = new StringBuilder("Call exception, tries=").append(tries)
121              .append(", retries=").append(maxAttempts).append(", started=")
122              .append((EnvironmentEdgeManager.currentTime() - tracker.getStartTime()))
123              .append(" ms ago, ").append("cancelled=").append(cancelled.get()).append(", msg=")
124              .append(t.getMessage()).append(", details=")
125              .append(callable.getExceptionMessageAdditionalDetail())
126              .append(", see https://s.apache.org/timeout");
127            if (LOG.isDebugEnabled()) {
128              builder.append(", exception=").append(StringUtils.stringifyException(t));
129              LOG.debug(builder.toString());
130            } else {
131              LOG.info(builder.toString());
132            }
133          }
134        }
135
136        callable.throwable(t, maxAttempts != 1);
137        RetriesExhaustedException.ThrowableWithExtraContext qt =
138          new RetriesExhaustedException.ThrowableWithExtraContext(t,
139            EnvironmentEdgeManager.currentTime(), toString());
140        exceptions.add(qt);
141        if (tries >= maxAttempts - 1) {
142          throw new RetriesExhaustedException(tries, exceptions);
143        }
144
145        if (t instanceof RpcThrottlingException) {
146          RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) t;
147          expectedSleep = rpcThrottlingException.getWaitInterval();
148          if (LOG.isDebugEnabled()) {
149            LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleep,
150              rpcThrottlingException);
151          }
152        } else {
153          expectedSleep =
154            HBaseServerException.isServerOverloaded(t) ? pauseForServerOverloaded : pause;
155
156          // only factor in retry adjustment for non-RpcThrottlingExceptions
157          // because RpcThrottlingExceptions tell you how long to wait
158
159          // If the server is dead, we need to wait a little before retrying, to give
160          // a chance to the regions to be moved
161          // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
162          // special when encountering an exception indicating the server is overloaded.
163          // see #HBASE-17114 and HBASE-26807
164          expectedSleep = callable.sleep(expectedSleep, tries);
165        }
166
167        // If, after the planned sleep, there won't be enough time left, we stop now.
168        long duration = singleCallDuration(expectedSleep);
169        if (duration > callTimeout) {
170          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + ": "
171            + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
172          throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t);
173        }
174        if (metrics != null && HBaseServerException.isServerOverloaded(t)) {
175          metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS);
176        }
177      } finally {
178        interceptor.updateFailureInfo(context);
179      }
180      try {
181        if (expectedSleep > 0) {
182          synchronized (cancelled) {
183            if (cancelled.get()) return null;
184            cancelled.wait(expectedSleep);
185          }
186        }
187        if (cancelled.get()) return null;
188      } catch (InterruptedException e) {
189        throw new InterruptedIOException(
190          "Interrupted after " + tries + " tries while maxAttempts=" + maxAttempts);
191      }
192    }
193  }
194
195  /** Returns Calculate how long a single call took */
196  private long singleCallDuration(final long expectedSleep) {
197    return (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + expectedSleep;
198  }
199
200  @Override
201  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
202    throws IOException, RuntimeException {
203    // The code of this method should be shared with withRetries.
204    try {
205      callable.prepare(false);
206      return callable.call(callTimeout);
207    } catch (Throwable t) {
208      Throwable t2 = translateException(t);
209      ExceptionUtil.rethrowIfInterrupt(t2);
210      // It would be nice to clear the location cache here.
211      if (t2 instanceof IOException) {
212        throw (IOException) t2;
213      } else {
214        throw new RuntimeException(t2);
215      }
216    }
217  }
218
219  /**
220   * Get the good or the remote exception if any, throws the DoNotRetryIOException.
221   * @param t the throwable to analyze
222   * @return the translated exception, if it's not a DoNotRetryIOException
223   * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
224   */
225  static Throwable translateException(Throwable t) throws DoNotRetryIOException {
226    if (t instanceof UndeclaredThrowableException) {
227      if (t.getCause() != null) {
228        t = t.getCause();
229      }
230    }
231    if (t instanceof RemoteException) {
232      t = ((RemoteException) t).unwrapRemoteException();
233    }
234    if (t instanceof LinkageError) {
235      throw new DoNotRetryIOException(t);
236    }
237    if (t instanceof ServiceException) {
238      ServiceException se = (ServiceException) t;
239      Throwable cause = se.getCause();
240      if (cause instanceof DoNotRetryIOException) {
241        throw (DoNotRetryIOException) cause;
242      }
243      // Don't let ServiceException out; its rpc specific.
244      // It also could be a RemoteException, so go around again.
245      t = translateException(cause);
246    } else if (t instanceof DoNotRetryIOException) {
247      throw (DoNotRetryIOException) t;
248    }
249    return t;
250  }
251
252  private int getTimeout(int callTimeout) {
253    int timeout = tracker.getRemainingTime(callTimeout);
254    if (timeout <= 0 || (rpcTimeout > 0 && rpcTimeout < timeout)) {
255      timeout = rpcTimeout;
256    }
257    return timeout;
258  }
259
260  @Override
261  public String toString() {
262    return "RpcRetryingCaller{" + "globalStartTime=" + Instant.ofEpochMilli(tracker.getStartTime())
263      + ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}';
264  }
265}