001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
028
029import io.opentelemetry.context.Context;
030import io.opentelemetry.context.Scope;
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.OptionalLong;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.TimeUnit;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HBaseServerException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.UnknownScannerException;
045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
046import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
047import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
048import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
049import org.apache.hadoop.hbase.exceptions.ScannerResetException;
050import org.apache.hadoop.hbase.ipc.HBaseRpcController;
051import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058import org.apache.hbase.thirdparty.io.netty.util.Timeout;
059import org.apache.hbase.thirdparty.io.netty.util.Timer;
060
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
063import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
068
069/**
070 * Retry caller for scanning a region.
071 * <p>
072 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
073 * reference of this object and use it to open new single region scanners.
074 */
075@InterfaceAudience.Private
076class AsyncScanSingleRegionRpcRetryingCaller {
077
078  private static final Logger LOG =
079    LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
080
081  private final Timer retryTimer;
082
083  private final Scan scan;
084
085  private final ScanMetrics scanMetrics;
086
087  private final long scannerId;
088
089  private final ScanResultCache resultCache;
090
091  private final AdvancedScanResultConsumer consumer;
092
093  private final ClientService.Interface stub;
094
095  private final HRegionLocation loc;
096
097  private final boolean regionServerRemote;
098
099  private final int priority;
100
101  private final long scannerLeaseTimeoutPeriodNs;
102
103  private final int maxAttempts;
104
105  private final long scanTimeoutNs;
106
107  private final long rpcTimeoutNs;
108
109  private final int startLogErrorsCnt;
110
111  private final Runnable completeWhenNoMoreResultsInRegion;
112
113  protected final AsyncConnectionImpl conn;
114
115  private final CompletableFuture<Boolean> future;
116
117  private final HBaseRpcController controller;
118
119  private byte[] nextStartRowWhenError;
120
121  private boolean includeNextStartRowWhenError;
122
123  private long nextCallStartNs;
124
125  private int tries;
126
127  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
128
129  private long nextCallSeq = -1L;
130
131  private final HBaseServerExceptionPauseManager pauseManager;
132
133  private enum ScanControllerState {
134    INITIALIZED,
135    SUSPENDED,
136    TERMINATED,
137    DESTROYED
138  }
139
140  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
141  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
142  // usage. We use two things to prevent invalid usage:
143  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
144  // IllegalStateException if the caller thread is not this thread.
145  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
146  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
147  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
148  // call destroy to get the current state and set the state to DESTROYED. And when user calls
149  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
150  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
151  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
152  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
153  // to be used in the future.
154  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
155  // package private methods can only be called within the implementation of
156  // AsyncScanSingleRegionRpcRetryingCaller.
157  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
158
159    // Make sure the methods are only called in this thread.
160    private final Thread callerThread;
161
162    private final Optional<Cursor> cursor;
163
164    // INITIALIZED -> SUSPENDED -> DESTROYED
165    // INITIALIZED -> TERMINATED -> DESTROYED
166    // INITIALIZED -> DESTROYED
167    // If the state is incorrect we will throw IllegalStateException.
168    private ScanControllerState state = ScanControllerState.INITIALIZED;
169
170    private ScanResumerImpl resumer;
171
172    public ScanControllerImpl(Optional<Cursor> cursor) {
173      this.callerThread = Thread.currentThread();
174      this.cursor = cursor;
175    }
176
177    private void preCheck() {
178      Preconditions.checkState(Thread.currentThread() == callerThread,
179        "The current thread is %s, expected thread is %s, "
180          + "you should not call this method outside onNext or onHeartbeat",
181        Thread.currentThread(), callerThread);
182      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
183        "Invalid Stopper state %s", state);
184    }
185
186    @Override
187    public ScanResumer suspend() {
188      preCheck();
189      state = ScanControllerState.SUSPENDED;
190      ScanResumerImpl resumer = new ScanResumerImpl();
191      this.resumer = resumer;
192      return resumer;
193    }
194
195    @Override
196    public void terminate() {
197      preCheck();
198      state = ScanControllerState.TERMINATED;
199    }
200
201    // return the current state, and set the state to DESTROYED.
202    ScanControllerState destroy() {
203      ScanControllerState state = this.state;
204      this.state = ScanControllerState.DESTROYED;
205      return state;
206    }
207
208    @Override
209    public Optional<Cursor> cursor() {
210      return cursor;
211    }
212  }
213
214  private enum ScanResumerState {
215    INITIALIZED,
216    SUSPENDED,
217    RESUMED
218  }
219
220  // The resume method is allowed to be called in another thread so here we also use the
221  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
222  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
223  // and when user calls resume method, we will change the state to RESUMED. But the resume method
224  // could be called in other thread, and in fact, user could just do this:
225  // controller.suspend().resume()
226  // This is strange but valid. This means the scan could be resumed before we call the prepare
227  // method to do the actual suspend work. So in the resume method, we will check if the state is
228  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
229  // method, if the state is RESUMED already, we will just return an let the scan go on.
230  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
231  // package private methods can only be called within the implementation of
232  // AsyncScanSingleRegionRpcRetryingCaller.
233  @InterfaceAudience.Private
234  final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
235
236    // INITIALIZED -> SUSPENDED -> RESUMED
237    // INITIALIZED -> RESUMED
238    private ScanResumerState state = ScanResumerState.INITIALIZED;
239
240    private ScanResponse resp;
241
242    private int numberOfCompleteRows;
243
244    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
245    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
246    // every time when the previous task is finished. There could also be race as the renewal is
247    // executed in the timer thread, so we also need to check the state before lease renewal. If the
248    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
249    // renewal task.
250    private Timeout leaseRenewer;
251
252    @Override
253    public void resume() {
254      doResume(false);
255    }
256
257    /**
258     * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a
259     * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results.
260     */
261    public void terminate() {
262      doResume(true);
263    }
264
265    private void doResume(boolean stopScan) {
266      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
267      // just return at the first if condition without loading the resp and numValidResuls field. If
268      // resume is called after suspend, then it is also safe to just reference resp and
269      // numValidResults after the synchronized block as no one will change it anymore.
270      ScanResponse localResp;
271      int localNumberOfCompleteRows;
272      synchronized (this) {
273        if (state == ScanResumerState.INITIALIZED) {
274          // user calls this method before we call prepare, so just set the state to
275          // RESUMED, the implementation will just go on.
276          state = ScanResumerState.RESUMED;
277          return;
278        }
279        if (state == ScanResumerState.RESUMED) {
280          // already resumed, give up.
281          return;
282        }
283        state = ScanResumerState.RESUMED;
284        if (leaseRenewer != null) {
285          leaseRenewer.cancel();
286        }
287        localResp = this.resp;
288        localNumberOfCompleteRows = this.numberOfCompleteRows;
289      }
290      if (stopScan) {
291        stopScan(localResp);
292      } else {
293        completeOrNext(localResp, localNumberOfCompleteRows);
294      }
295    }
296
297    private void scheduleRenewLeaseTask() {
298      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
299        TimeUnit.NANOSECONDS);
300    }
301
302    private synchronized void tryRenewLease() {
303      // the scan has already been resumed, give up
304      if (state == ScanResumerState.RESUMED) {
305        return;
306      }
307      renewLease();
308      // schedule the next renew lease task again as this is a one-time task.
309      scheduleRenewLeaseTask();
310    }
311
312    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
313    // for more details.
314    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
315      if (state == ScanResumerState.RESUMED) {
316        // user calls resume before we actually suspend the scan, just continue;
317        return false;
318      }
319      state = ScanResumerState.SUSPENDED;
320      this.resp = resp;
321      this.numberOfCompleteRows = numberOfCompleteRows;
322      // if there are no more results in region then the scanner at RS side will be closed
323      // automatically so we do not need to renew lease.
324      if (resp.getMoreResultsInRegion()) {
325        // schedule renew lease task
326        scheduleRenewLeaseTask();
327      }
328      return true;
329    }
330  }
331
332  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
333    Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
334    AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
335    boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
336    long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
337    int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
338    this.retryTimer = retryTimer;
339    this.conn = conn;
340    this.scan = scan;
341    this.scanMetrics = scanMetrics;
342    this.scannerId = scannerId;
343    this.resultCache = resultCache;
344    this.consumer = consumer;
345    this.stub = stub;
346    this.loc = loc;
347    this.regionServerRemote = isRegionServerRemote;
348    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
349    this.maxAttempts = maxAttempts;
350    this.scanTimeoutNs = scanTimeoutNs;
351    this.rpcTimeoutNs = rpcTimeoutNs;
352    this.startLogErrorsCnt = startLogErrorsCnt;
353    if (scan.isReversed()) {
354      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
355    } else {
356      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
357    }
358    this.future = new CompletableFuture<>();
359    this.priority = priority;
360    this.controller = conn.rpcControllerFactory.newController();
361    this.controller.setPriority(priority);
362    this.controller.setRequestAttributes(requestAttributes);
363    this.exceptions = new ArrayList<>();
364    this.pauseManager =
365      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
366  }
367
368  private long elapsedMs() {
369    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
370  }
371
372  private void closeScanner() {
373    incRPCCallsMetrics(scanMetrics, regionServerRemote);
374    resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, loc.getRegion().getTable());
375    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
376    stub.scan(controller, req, resp -> {
377      if (controller.failed()) {
378        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
379          + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
380          + " failed, ignore, probably already closed", controller.getFailed());
381      }
382    });
383  }
384
385  private void completeExceptionally(boolean closeScanner) {
386    resultCache.clear();
387    if (closeScanner) {
388      closeScanner();
389    }
390    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
391  }
392
393  private void completeNoMoreResults() {
394    future.complete(false);
395  }
396
397  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
398    scan.withStartRow(row, inclusive);
399    future.complete(true);
400  }
401
402  private void completeWhenError(boolean closeScanner) {
403    incRPCRetriesMetrics(scanMetrics, closeScanner);
404    resultCache.clear();
405    if (closeScanner) {
406      closeScanner();
407    }
408    if (nextStartRowWhenError != null) {
409      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
410    }
411    future.complete(true);
412  }
413
414  private void onError(Throwable error) {
415    error = translateException(error);
416    if (tries > startLogErrorsCnt) {
417      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
418        + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
419        + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
420        + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
421        + " ms", error);
422    }
423    boolean scannerClosed =
424      error instanceof UnknownScannerException || error instanceof NotServingRegionException
425        || error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
426    RetriesExhaustedException.ThrowableWithExtraContext qt =
427      new RetriesExhaustedException.ThrowableWithExtraContext(error,
428        EnvironmentEdgeManager.currentTime(), "");
429    exceptions.add(qt);
430    if (tries >= maxAttempts) {
431      completeExceptionally(!scannerClosed);
432      return;
433    }
434
435    OptionalLong maybePauseNsToUse =
436      pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
437    if (!maybePauseNsToUse.isPresent()) {
438      completeExceptionally(!scannerClosed);
439      return;
440    }
441    long delayNs = maybePauseNsToUse.getAsLong();
442    if (scannerClosed) {
443      completeWhenError(false);
444      return;
445    }
446    if (error instanceof OutOfOrderScannerNextException) {
447      completeWhenError(true);
448      return;
449    }
450    if (error instanceof DoNotRetryIOException) {
451      completeExceptionally(true);
452      return;
453    }
454    tries++;
455    if (HBaseServerException.isServerOverloaded(error)) {
456      Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
457      metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
458    }
459    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
460  }
461
462  private void updateNextStartRowWhenError(Result result) {
463    nextStartRowWhenError = result.getRow();
464    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
465  }
466
467  private void completeWhenNoMoreResultsInRegion() {
468    if (noMoreResultsForScan(scan, loc.getRegion())) {
469      completeNoMoreResults();
470    } else {
471      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
472    }
473  }
474
475  private void completeReversedWhenNoMoreResultsInRegion() {
476    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
477      completeNoMoreResults();
478    } else {
479      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
480    }
481  }
482
483  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
484    if (resp.hasMoreResults() && !resp.getMoreResults()) {
485      // RS tells us there is no more data for the whole scan
486      completeNoMoreResults();
487      return;
488    }
489    if (scan.getLimit() > 0) {
490      // The RS should have set the moreResults field in ScanResponse to false when we have reached
491      // the limit, so we add an assert here.
492      int newLimit = scan.getLimit() - numberOfCompleteRows;
493      assert newLimit > 0;
494      scan.setLimit(newLimit);
495    }
496    // as in 2.0 this value will always be set
497    if (!resp.getMoreResultsInRegion()) {
498      completeWhenNoMoreResultsInRegion.run();
499      return;
500    }
501    next();
502  }
503
504  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
505    if (controller.failed()) {
506      onError(controller.getFailed());
507      return;
508    }
509    updateServerSideMetrics(scanMetrics, resp);
510    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
511    Result[] rawResults;
512    Result[] results;
513    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
514    try {
515      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
516      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
517      results = resultCache.addAndGet(
518        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
519        isHeartbeatMessage);
520    } catch (IOException e) {
521      // We can not retry here. The server has responded normally and the call sequence has been
522      // increased so a new scan with the same call sequence will cause an
523      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
524      LOG.warn("decode scan response failed", e);
525      completeWhenError(true);
526      return;
527    }
528
529    ScanControllerImpl scanController;
530    if (results.length > 0) {
531      scanController = new ScanControllerImpl(
532        resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty());
533      updateNextStartRowWhenError(results[results.length - 1]);
534      consumer.onNext(results, scanController);
535    } else {
536      Optional<Cursor> cursor = Optional.empty();
537      if (resp.hasCursor()) {
538        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
539      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
540        // It is size limit exceed and we need to return the last Result's row.
541        // When user setBatch and the scanner is reopened, the server may return Results that
542        // user has seen and the last Result can not be seen because the number is not enough.
543        // So the row keys of results may not be same, we must use the last one.
544        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
545      }
546      scanController = new ScanControllerImpl(cursor);
547      if (isHeartbeatMessage || cursor.isPresent()) {
548        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
549        // want to pass a cursor to upper layer.
550        consumer.onHeartbeat(scanController);
551      }
552    }
553    ScanControllerState state = scanController.destroy();
554    if (state == ScanControllerState.TERMINATED) {
555      stopScan(resp);
556      return;
557    }
558    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
559    if (state == ScanControllerState.SUSPENDED) {
560      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
561        return;
562      }
563    }
564    completeOrNext(resp, numberOfCompleteRows);
565  }
566
567  private void stopScan(ScanResponse resp) {
568    if (resp.getMoreResultsInRegion()) {
569      // we have more results in region but user request to stop the scan, so we need to close the
570      // scanner explicitly.
571      closeScanner();
572    }
573    completeNoMoreResults();
574  }
575
576  private void call() {
577    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
578    // less than the scan timeout. If the server does not respond in time(usually this will not
579    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
580    // resending the next request and the only way to fix this is to close the scanner and open a
581    // new one.
582    long callTimeoutNs;
583    if (scanTimeoutNs > 0) {
584      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
585      if (remainingNs <= 0) {
586        completeExceptionally(true);
587        return;
588      }
589      callTimeoutNs = remainingNs;
590    } else {
591      callTimeoutNs = 0L;
592    }
593    incRPCCallsMetrics(scanMetrics, regionServerRemote);
594    if (tries > 1) {
595      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
596    }
597    resetController(controller, callTimeoutNs, priority, loc.getRegion().getTable());
598    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
599      nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
600    final Context context = Context.current();
601    stub.scan(controller, req, resp -> {
602      try (Scope ignored = context.makeCurrent()) {
603        onComplete(controller, resp);
604      }
605    });
606  }
607
608  private void next() {
609    nextCallSeq++;
610    tries = 1;
611    exceptions.clear();
612    nextCallStartNs = System.nanoTime();
613    call();
614  }
615
616  private void renewLease() {
617    incRPCCallsMetrics(scanMetrics, regionServerRemote);
618    nextCallSeq++;
619    resetController(controller, rpcTimeoutNs, priority, loc.getRegion().getTable());
620    ScanRequest req =
621      RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
622    stub.scan(controller, req, resp -> {
623    });
624  }
625
626  /**
627   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
628   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
629   * open scanner request is also needed because we may have some data in the CellScanner which is
630   * contained in the controller.
631   * @return {@code true} if we should continue, otherwise {@code false}.
632   */
633  public CompletableFuture<Boolean> start(HBaseRpcController controller,
634    ScanResponse respWhenOpen) {
635    onComplete(controller, respWhenOpen);
636    return future;
637  }
638}