001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 028 029import io.opentelemetry.context.Context; 030import io.opentelemetry.context.Scope; 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.OptionalLong; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.TimeUnit; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.HBaseServerException; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.UnknownScannerException; 045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; 046import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager; 047import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 048import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 049import org.apache.hadoop.hbase.exceptions.ScannerResetException; 050import org.apache.hadoop.hbase.ipc.HBaseRpcController; 051import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 058import org.apache.hbase.thirdparty.io.netty.util.Timeout; 059import org.apache.hbase.thirdparty.io.netty.util.Timer; 060 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 063import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 068 069/** 070 * Retry caller for scanning a region. 071 * <p> 072 * We will modify the {@link Scan} object passed in directly. The upper layer should store the 073 * reference of this object and use it to open new single region scanners. 074 */ 075@InterfaceAudience.Private 076class AsyncScanSingleRegionRpcRetryingCaller { 077 078 private static final Logger LOG = 079 LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class); 080 081 private final Timer retryTimer; 082 083 private final Scan scan; 084 085 private final ScanMetrics scanMetrics; 086 087 private final long scannerId; 088 089 private final ScanResultCache resultCache; 090 091 private final AdvancedScanResultConsumer consumer; 092 093 private final ClientService.Interface stub; 094 095 private final HRegionLocation loc; 096 097 private final boolean regionServerRemote; 098 099 private final int priority; 100 101 private final long scannerLeaseTimeoutPeriodNs; 102 103 private final int maxAttempts; 104 105 private final long scanTimeoutNs; 106 107 private final long rpcTimeoutNs; 108 109 private final int startLogErrorsCnt; 110 111 private final Runnable completeWhenNoMoreResultsInRegion; 112 113 protected final AsyncConnectionImpl conn; 114 115 private final CompletableFuture<Boolean> future; 116 117 private final HBaseRpcController controller; 118 119 private byte[] nextStartRowWhenError; 120 121 private boolean includeNextStartRowWhenError; 122 123 private long nextCallStartNs; 124 125 private int tries; 126 127 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 128 129 private long nextCallSeq = -1L; 130 131 private final HBaseServerExceptionPauseManager pauseManager; 132 133 private enum ScanControllerState { 134 INITIALIZED, 135 SUSPENDED, 136 TERMINATED, 137 DESTROYED 138 } 139 140 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments 141 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid 142 // usage. We use two things to prevent invalid usage: 143 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an 144 // IllegalStateException if the caller thread is not this thread. 145 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will 146 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to 147 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will 148 // call destroy to get the current state and set the state to DESTROYED. And when user calls 149 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw 150 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call 151 // suspend or terminate so the state will still be INITIALIZED when back from onNext or 152 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller 153 // to be used in the future. 154 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 155 // package private methods can only be called within the implementation of 156 // AsyncScanSingleRegionRpcRetryingCaller. 157 private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController { 158 159 // Make sure the methods are only called in this thread. 160 private final Thread callerThread; 161 162 private final Optional<Cursor> cursor; 163 164 // INITIALIZED -> SUSPENDED -> DESTROYED 165 // INITIALIZED -> TERMINATED -> DESTROYED 166 // INITIALIZED -> DESTROYED 167 // If the state is incorrect we will throw IllegalStateException. 168 private ScanControllerState state = ScanControllerState.INITIALIZED; 169 170 private ScanResumerImpl resumer; 171 172 public ScanControllerImpl(Optional<Cursor> cursor) { 173 this.callerThread = Thread.currentThread(); 174 this.cursor = cursor; 175 } 176 177 private void preCheck() { 178 Preconditions.checkState(Thread.currentThread() == callerThread, 179 "The current thread is %s, expected thread is %s, " 180 + "you should not call this method outside onNext or onHeartbeat", 181 Thread.currentThread(), callerThread); 182 Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED), 183 "Invalid Stopper state %s", state); 184 } 185 186 @Override 187 public ScanResumer suspend() { 188 preCheck(); 189 state = ScanControllerState.SUSPENDED; 190 ScanResumerImpl resumer = new ScanResumerImpl(); 191 this.resumer = resumer; 192 return resumer; 193 } 194 195 @Override 196 public void terminate() { 197 preCheck(); 198 state = ScanControllerState.TERMINATED; 199 } 200 201 // return the current state, and set the state to DESTROYED. 202 ScanControllerState destroy() { 203 ScanControllerState state = this.state; 204 this.state = ScanControllerState.DESTROYED; 205 return state; 206 } 207 208 @Override 209 public Optional<Cursor> cursor() { 210 return cursor; 211 } 212 } 213 214 private enum ScanResumerState { 215 INITIALIZED, 216 SUSPENDED, 217 RESUMED 218 } 219 220 // The resume method is allowed to be called in another thread so here we also use the 221 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back 222 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, 223 // and when user calls resume method, we will change the state to RESUMED. But the resume method 224 // could be called in other thread, and in fact, user could just do this: 225 // controller.suspend().resume() 226 // This is strange but valid. This means the scan could be resumed before we call the prepare 227 // method to do the actual suspend work. So in the resume method, we will check if the state is 228 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare 229 // method, if the state is RESUMED already, we will just return an let the scan go on. 230 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 231 // package private methods can only be called within the implementation of 232 // AsyncScanSingleRegionRpcRetryingCaller. 233 @InterfaceAudience.Private 234 final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { 235 236 // INITIALIZED -> SUSPENDED -> RESUMED 237 // INITIALIZED -> RESUMED 238 private ScanResumerState state = ScanResumerState.INITIALIZED; 239 240 private ScanResponse resp; 241 242 private int numberOfCompleteRows; 243 244 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed 245 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task 246 // every time when the previous task is finished. There could also be race as the renewal is 247 // executed in the timer thread, so we also need to check the state before lease renewal. If the 248 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease 249 // renewal task. 250 private Timeout leaseRenewer; 251 252 @Override 253 public void resume() { 254 doResume(false); 255 } 256 257 /** 258 * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a 259 * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results. 260 */ 261 public void terminate() { 262 doResume(true); 263 } 264 265 private void doResume(boolean stopScan) { 266 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we 267 // just return at the first if condition without loading the resp and numValidResuls field. If 268 // resume is called after suspend, then it is also safe to just reference resp and 269 // numValidResults after the synchronized block as no one will change it anymore. 270 ScanResponse localResp; 271 int localNumberOfCompleteRows; 272 synchronized (this) { 273 if (state == ScanResumerState.INITIALIZED) { 274 // user calls this method before we call prepare, so just set the state to 275 // RESUMED, the implementation will just go on. 276 state = ScanResumerState.RESUMED; 277 return; 278 } 279 if (state == ScanResumerState.RESUMED) { 280 // already resumed, give up. 281 return; 282 } 283 state = ScanResumerState.RESUMED; 284 if (leaseRenewer != null) { 285 leaseRenewer.cancel(); 286 } 287 localResp = this.resp; 288 localNumberOfCompleteRows = this.numberOfCompleteRows; 289 } 290 if (stopScan) { 291 stopScan(localResp); 292 } else { 293 completeOrNext(localResp, localNumberOfCompleteRows); 294 } 295 } 296 297 private void scheduleRenewLeaseTask() { 298 leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, 299 TimeUnit.NANOSECONDS); 300 } 301 302 private synchronized void tryRenewLease() { 303 // the scan has already been resumed, give up 304 if (state == ScanResumerState.RESUMED) { 305 return; 306 } 307 renewLease(); 308 // schedule the next renew lease task again as this is a one-time task. 309 scheduleRenewLeaseTask(); 310 } 311 312 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl 313 // for more details. 314 synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) { 315 if (state == ScanResumerState.RESUMED) { 316 // user calls resume before we actually suspend the scan, just continue; 317 return false; 318 } 319 state = ScanResumerState.SUSPENDED; 320 this.resp = resp; 321 this.numberOfCompleteRows = numberOfCompleteRows; 322 // if there are no more results in region then the scanner at RS side will be closed 323 // automatically so we do not need to renew lease. 324 if (resp.getMoreResultsInRegion()) { 325 // schedule renew lease task 326 scheduleRenewLeaseTask(); 327 } 328 return true; 329 } 330 } 331 332 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 333 Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, 334 AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, 335 boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, 336 long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, 337 int startLogErrorsCnt, Map<String, byte[]> requestAttributes) { 338 this.retryTimer = retryTimer; 339 this.conn = conn; 340 this.scan = scan; 341 this.scanMetrics = scanMetrics; 342 this.scannerId = scannerId; 343 this.resultCache = resultCache; 344 this.consumer = consumer; 345 this.stub = stub; 346 this.loc = loc; 347 this.regionServerRemote = isRegionServerRemote; 348 this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; 349 this.maxAttempts = maxAttempts; 350 this.scanTimeoutNs = scanTimeoutNs; 351 this.rpcTimeoutNs = rpcTimeoutNs; 352 this.startLogErrorsCnt = startLogErrorsCnt; 353 if (scan.isReversed()) { 354 completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; 355 } else { 356 completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; 357 } 358 this.future = new CompletableFuture<>(); 359 this.priority = priority; 360 this.controller = conn.rpcControllerFactory.newController(); 361 this.controller.setPriority(priority); 362 this.controller.setRequestAttributes(requestAttributes); 363 this.exceptions = new ArrayList<>(); 364 this.pauseManager = 365 new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs); 366 } 367 368 private long elapsedMs() { 369 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); 370 } 371 372 private void closeScanner() { 373 incRPCCallsMetrics(scanMetrics, regionServerRemote); 374 resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, loc.getRegion().getTable()); 375 ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); 376 stub.scan(controller, req, resp -> { 377 if (controller.failed()) { 378 LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId 379 + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() 380 + " failed, ignore, probably already closed", controller.getFailed()); 381 } 382 }); 383 } 384 385 private void completeExceptionally(boolean closeScanner) { 386 resultCache.clear(); 387 if (closeScanner) { 388 closeScanner(); 389 } 390 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 391 } 392 393 private void completeNoMoreResults() { 394 future.complete(false); 395 } 396 397 private void completeWithNextStartRow(byte[] row, boolean inclusive) { 398 scan.withStartRow(row, inclusive); 399 future.complete(true); 400 } 401 402 private void completeWhenError(boolean closeScanner) { 403 incRPCRetriesMetrics(scanMetrics, closeScanner); 404 resultCache.clear(); 405 if (closeScanner) { 406 closeScanner(); 407 } 408 if (nextStartRowWhenError != null) { 409 scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError); 410 } 411 future.complete(true); 412 } 413 414 private void onError(Throwable error) { 415 error = translateException(error); 416 if (tries > startLogErrorsCnt) { 417 LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " 418 + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() 419 + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " 420 + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() 421 + " ms", error); 422 } 423 boolean scannerClosed = 424 error instanceof UnknownScannerException || error instanceof NotServingRegionException 425 || error instanceof RegionServerStoppedException || error instanceof ScannerResetException; 426 RetriesExhaustedException.ThrowableWithExtraContext qt = 427 new RetriesExhaustedException.ThrowableWithExtraContext(error, 428 EnvironmentEdgeManager.currentTime(), ""); 429 exceptions.add(qt); 430 if (tries >= maxAttempts) { 431 completeExceptionally(!scannerClosed); 432 return; 433 } 434 435 OptionalLong maybePauseNsToUse = 436 pauseManager.getPauseNsFromException(error, tries, nextCallStartNs); 437 if (!maybePauseNsToUse.isPresent()) { 438 completeExceptionally(!scannerClosed); 439 return; 440 } 441 long delayNs = maybePauseNsToUse.getAsLong(); 442 if (scannerClosed) { 443 completeWhenError(false); 444 return; 445 } 446 if (error instanceof OutOfOrderScannerNextException) { 447 completeWhenError(true); 448 return; 449 } 450 if (error instanceof DoNotRetryIOException) { 451 completeExceptionally(true); 452 return; 453 } 454 tries++; 455 if (HBaseServerException.isServerOverloaded(error)) { 456 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 457 metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); 458 } 459 retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); 460 } 461 462 private void updateNextStartRowWhenError(Result result) { 463 nextStartRowWhenError = result.getRow(); 464 includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); 465 } 466 467 private void completeWhenNoMoreResultsInRegion() { 468 if (noMoreResultsForScan(scan, loc.getRegion())) { 469 completeNoMoreResults(); 470 } else { 471 completeWithNextStartRow(loc.getRegion().getEndKey(), true); 472 } 473 } 474 475 private void completeReversedWhenNoMoreResultsInRegion() { 476 if (noMoreResultsForReverseScan(scan, loc.getRegion())) { 477 completeNoMoreResults(); 478 } else { 479 completeWithNextStartRow(loc.getRegion().getStartKey(), false); 480 } 481 } 482 483 private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) { 484 if (resp.hasMoreResults() && !resp.getMoreResults()) { 485 // RS tells us there is no more data for the whole scan 486 completeNoMoreResults(); 487 return; 488 } 489 if (scan.getLimit() > 0) { 490 // The RS should have set the moreResults field in ScanResponse to false when we have reached 491 // the limit, so we add an assert here. 492 int newLimit = scan.getLimit() - numberOfCompleteRows; 493 assert newLimit > 0; 494 scan.setLimit(newLimit); 495 } 496 // as in 2.0 this value will always be set 497 if (!resp.getMoreResultsInRegion()) { 498 completeWhenNoMoreResultsInRegion.run(); 499 return; 500 } 501 next(); 502 } 503 504 private void onComplete(HBaseRpcController controller, ScanResponse resp) { 505 if (controller.failed()) { 506 onError(controller.getFailed()); 507 return; 508 } 509 updateServerSideMetrics(scanMetrics, resp); 510 boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); 511 Result[] rawResults; 512 Result[] results; 513 int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); 514 try { 515 rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); 516 updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); 517 results = resultCache.addAndGet( 518 Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), 519 isHeartbeatMessage); 520 } catch (IOException e) { 521 // We can not retry here. The server has responded normally and the call sequence has been 522 // increased so a new scan with the same call sequence will cause an 523 // OutOfOrderScannerNextException. Let the upper layer open a new scanner. 524 LOG.warn("decode scan response failed", e); 525 completeWhenError(true); 526 return; 527 } 528 529 ScanControllerImpl scanController; 530 if (results.length > 0) { 531 scanController = new ScanControllerImpl( 532 resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty()); 533 updateNextStartRowWhenError(results[results.length - 1]); 534 consumer.onNext(results, scanController); 535 } else { 536 Optional<Cursor> cursor = Optional.empty(); 537 if (resp.hasCursor()) { 538 cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor())); 539 } else if (scan.isNeedCursorResult() && rawResults.length > 0) { 540 // It is size limit exceed and we need to return the last Result's row. 541 // When user setBatch and the scanner is reopened, the server may return Results that 542 // user has seen and the last Result can not be seen because the number is not enough. 543 // So the row keys of results may not be same, we must use the last one. 544 cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow())); 545 } 546 scanController = new ScanControllerImpl(cursor); 547 if (isHeartbeatMessage || cursor.isPresent()) { 548 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we 549 // want to pass a cursor to upper layer. 550 consumer.onHeartbeat(scanController); 551 } 552 } 553 ScanControllerState state = scanController.destroy(); 554 if (state == ScanControllerState.TERMINATED) { 555 stopScan(resp); 556 return; 557 } 558 int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 559 if (state == ScanControllerState.SUSPENDED) { 560 if (scanController.resumer.prepare(resp, numberOfCompleteRows)) { 561 return; 562 } 563 } 564 completeOrNext(resp, numberOfCompleteRows); 565 } 566 567 private void stopScan(ScanResponse resp) { 568 if (resp.getMoreResultsInRegion()) { 569 // we have more results in region but user request to stop the scan, so we need to close the 570 // scanner explicitly. 571 closeScanner(); 572 } 573 completeNoMoreResults(); 574 } 575 576 private void call() { 577 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is 578 // less than the scan timeout. If the server does not respond in time(usually this will not 579 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when 580 // resending the next request and the only way to fix this is to close the scanner and open a 581 // new one. 582 long callTimeoutNs; 583 if (scanTimeoutNs > 0) { 584 long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 585 if (remainingNs <= 0) { 586 completeExceptionally(true); 587 return; 588 } 589 callTimeoutNs = remainingNs; 590 } else { 591 callTimeoutNs = 0L; 592 } 593 incRPCCallsMetrics(scanMetrics, regionServerRemote); 594 if (tries > 1) { 595 incRPCRetriesMetrics(scanMetrics, regionServerRemote); 596 } 597 resetController(controller, callTimeoutNs, priority, loc.getRegion().getTable()); 598 ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, 599 nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); 600 final Context context = Context.current(); 601 stub.scan(controller, req, resp -> { 602 try (Scope ignored = context.makeCurrent()) { 603 onComplete(controller, resp); 604 } 605 }); 606 } 607 608 private void next() { 609 nextCallSeq++; 610 tries = 1; 611 exceptions.clear(); 612 nextCallStartNs = System.nanoTime(); 613 call(); 614 } 615 616 private void renewLease() { 617 incRPCCallsMetrics(scanMetrics, regionServerRemote); 618 nextCallSeq++; 619 resetController(controller, rpcTimeoutNs, priority, loc.getRegion().getTable()); 620 ScanRequest req = 621 RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); 622 stub.scan(controller, req, resp -> { 623 }); 624 } 625 626 /** 627 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also 628 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the 629 * open scanner request is also needed because we may have some data in the CellScanner which is 630 * contained in the controller. 631 * @return {@code true} if we should continue, otherwise {@code false}. 632 */ 633 public CompletableFuture<Boolean> start(HBaseRpcController controller, 634 ScanResponse respWhenOpen) { 635 onComplete(controller, respWhenOpen); 636 return future; 637 } 638}