001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
023
024import io.opentelemetry.api.trace.Span;
025import io.opentelemetry.api.trace.StatusCode;
026import io.opentelemetry.context.Scope;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.util.ArrayDeque;
030import java.util.Map;
031import java.util.Queue;
032import java.util.concurrent.ExecutorService;
033import org.apache.commons.lang3.mutable.MutableBoolean;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.DoNotRetryIOException;
036import org.apache.hadoop.hbase.HRegionInfo;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.UnknownScannerException;
040import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
041import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
042import org.apache.hadoop.hbase.exceptions.ScannerResetException;
043import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
044import org.apache.hadoop.hbase.regionserver.LeaseException;
045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053
054/**
055 * Implements the scanner interface for the HBase client. If there are multiple regions in a table,
056 * this scanner will iterate through them all.
057 */
058@InterfaceAudience.Private
059public abstract class ClientScanner extends AbstractClientScanner {
060
061  private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class);
062
063  protected final Scan scan;
064  // We clone the original client Scan to avoid modifying user object from scan internals.
065  // The below scanForMetrics is the client's object, which we mutate only for returning
066  // ScanMetrics.
067  // See https://issues.apache.org/jira/browse/HBASE-27402.
068  private final Scan scanForMetrics;
069
070  protected boolean closed = false;
071  // Current region scanner is against. Gets cleared if current region goes
072  // wonky: e.g. if it splits on us.
073  protected HRegionInfo currentRegion = null;
074  protected ScannerCallableWithReplicas callable = null;
075  protected Queue<Result> cache;
076  private final ScanResultCache scanResultCache;
077  protected final int caching;
078  protected long lastNext;
079  // Keep lastResult returned successfully in case we have to reset scanner.
080  protected Result lastResult = null;
081  protected final long maxScannerResultSize;
082  private final ClusterConnection connection;
083  protected final TableName tableName;
084  protected final int readRpcTimeout;
085  protected final int scannerTimeout;
086  private final boolean useScannerTimeoutForNextCalls;
087  protected boolean scanMetricsPublished = false;
088  protected RpcRetryingCaller<Result[]> caller;
089  protected RpcControllerFactory rpcControllerFactory;
090  protected Configuration conf;
091  protected final Span span;
092  // The timeout on the primary. Applicable if there are multiple replicas for a region
093  // In that case, we will only wait for this much timeout on the primary before going
094  // to the replicas and trying the same scan. Note that the retries will still happen
095  // on each replica and the first successful results will be taken. A timeout of 0 is
096  // disallowed.
097  protected final int primaryOperationTimeout;
098  private int retries;
099  protected final ExecutorService pool;
100  protected final Map<String, byte[]> requestAttributes;
101
102  /**
103   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
104   * row maybe changed changed.
105   * @param conf       The {@link Configuration} to use.
106   * @param scan       {@link Scan} to use in this scanner
107   * @param tableName  The table that we wish to scan
108   * @param connection Connection identifying the cluster
109   */
110  public ClientScanner(final Configuration conf, final Scan scan, final Scan scanForMetrics,
111    final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
112    RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
113    int scannerTimeout, int primaryOperationTimeout,
114    ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
115    throws IOException {
116    this.scanForMetrics = scanForMetrics;
117    if (LOG.isTraceEnabled()) {
118      LOG.trace(
119        "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
120    }
121    this.scan = scan;
122    this.tableName = tableName;
123    this.lastNext = EnvironmentEdgeManager.currentTime();
124    this.connection = connection;
125    this.pool = pool;
126    this.primaryOperationTimeout = primaryOperationTimeout;
127    this.retries = connectionConfiguration.getRetriesNumber();
128    if (scan.getMaxResultSize() > 0) {
129      this.maxScannerResultSize = scan.getMaxResultSize();
130    } else {
131      this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize();
132    }
133    this.readRpcTimeout = scanReadRpcTimeout;
134    this.scannerTimeout = scannerTimeout;
135    this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls();
136    this.requestAttributes = requestAttributes;
137
138    // check if application wants to collect scan metrics
139    initScanMetrics(scan);
140
141    // Use the caching from the Scan. If not set, use the default cache setting for this table.
142    if (this.scan.getCaching() > 0) {
143      this.caching = this.scan.getCaching();
144    } else {
145      this.caching = connectionConfiguration.getScannerCaching();
146    }
147
148    this.caller = rpcFactory.<Result[]> newCaller();
149    this.rpcControllerFactory = controllerFactory;
150
151    this.conf = conf;
152    this.span = Span.current();
153
154    this.scanResultCache = createScanResultCache(scan);
155    initCache();
156  }
157
158  protected final int getScanReplicaId() {
159    return Math.max(scan.getReplicaId(), RegionReplicaUtil.DEFAULT_REPLICA_ID);
160  }
161
162  protected ClusterConnection getConnection() {
163    return this.connection;
164  }
165
166  protected TableName getTable() {
167    return this.tableName;
168  }
169
170  protected int getRetries() {
171    return this.retries;
172  }
173
174  protected int getScannerTimeout() {
175    return this.scannerTimeout;
176  }
177
178  protected Configuration getConf() {
179    return this.conf;
180  }
181
182  protected Scan getScan() {
183    return scan;
184  }
185
186  protected ExecutorService getPool() {
187    return pool;
188  }
189
190  protected int getPrimaryOperationTimeout() {
191    return primaryOperationTimeout;
192  }
193
194  protected int getCaching() {
195    return caching;
196  }
197
198  protected long getTimestamp() {
199    return lastNext;
200  }
201
202  protected long getMaxResultSize() {
203    return maxScannerResultSize;
204  }
205
206  private void closeScanner() throws IOException {
207    if (this.callable != null) {
208      this.callable.setClose();
209      call(callable, caller, scannerTimeout, false);
210      this.callable = null;
211    }
212  }
213
214  /**
215   * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal
216   * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we
217   * will start next scan from the startKey of the currentRegion.
218   * @return {@code false} if we have reached the stop row. Otherwise {@code true}.
219   */
220  protected abstract boolean setNewStartKey();
221
222  /**
223   * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed
224   * scan we need to create a ReversedScannerCallable.
225   */
226  protected abstract ScannerCallable createScannerCallable();
227
228  /**
229   * Close the previous scanner and create a new ScannerCallable for the next scanner.
230   * <p>
231   * Marked as protected only because TestClientScanner need to override this method.
232   * @return false if we should terminate the scan. Otherwise
233   */
234  protected boolean moveToNextRegion() {
235    // Close the previous scanner if it's open
236    try {
237      closeScanner();
238    } catch (IOException e) {
239      // not a big deal continue
240      if (LOG.isDebugEnabled()) {
241        LOG.debug("close scanner for " + currentRegion + " failed", e);
242      }
243    }
244    if (currentRegion != null) {
245      if (!setNewStartKey()) {
246        return false;
247      }
248      scan.resetMvccReadPoint();
249      if (LOG.isTraceEnabled()) {
250        LOG.trace("Finished " + this.currentRegion);
251      }
252    }
253    if (LOG.isDebugEnabled() && this.currentRegion != null) {
254      // Only worth logging if NOT first region in scan.
255      LOG.debug(
256        "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow())
257          + "', " + (scan.includeStartRow() ? "inclusive" : "exclusive"));
258    }
259    // clear the current region, we will set a new value to it after the first call of the new
260    // callable.
261    this.currentRegion = null;
262    this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
263      createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
264      scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
265    this.callable.setCaching(this.caching);
266    incRegionCountMetrics(scanMetrics);
267    return true;
268  }
269
270  boolean isAnyRPCcancelled() {
271    return callable.isAnyRPCcancelled();
272  }
273
274  private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
275    int scannerTimeout, boolean updateCurrentRegion) throws IOException {
276    if (Thread.interrupted()) {
277      throw new InterruptedIOException();
278    }
279    // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
280    // we do a callWithRetries
281    Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout);
282    if (currentRegion == null && updateCurrentRegion) {
283      currentRegion = callable.getHRegionInfo();
284    }
285    return rrs;
286  }
287
288  /**
289   * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
290   * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
291   * framework because it doesn't support multi-instances of the same metrics on the same machine;
292   * for scan/map reduce scenarios, we will have multiple scans running at the same time. By
293   * default, scan metrics are disabled; if the application wants to collect them, this behavior can
294   * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
295   */
296  protected void writeScanMetrics() {
297    if (this.scanMetrics == null || scanMetricsPublished) {
298      return;
299    }
300    // Publish ScanMetrics to the Scan Object.
301    // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not
302    // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published
303    // to Scan will be messed up.
304    scanForMetrics.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
305      ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray());
306    scanMetricsPublished = true;
307  }
308
309  protected void initSyncCache() {
310    cache = new ArrayDeque<>();
311  }
312
313  protected Result nextWithSyncCache() throws IOException {
314    Result result = cache.poll();
315    if (result != null) {
316      return result;
317    }
318    // If there is nothing left in the cache and the scanner is closed,
319    // return a no-op
320    if (this.closed) {
321      return null;
322    }
323
324    loadCache();
325
326    // try again to load from cache
327    result = cache.poll();
328
329    // if we exhausted this scanner before calling close, write out the scan metrics
330    if (result == null) {
331      writeScanMetrics();
332    }
333    return result;
334  }
335
336  public int getCacheSize() {
337    return cache != null ? cache.size() : 0;
338  }
339
340  private boolean scanExhausted() {
341    return callable.moreResultsForScan() == MoreResults.NO;
342  }
343
344  private boolean regionExhausted(Result[] values) {
345    // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the
346    // old time we always return empty result for a open scanner operation so we add a check here to
347    // keep compatible with the old logic. Should remove the isOpenScanner in the future.
348    // 2. Server tells us that it has no more results for this region.
349    return (values.length == 0 && !callable.isHeartbeatMessage())
350      || callable.moreResultsInRegion() == MoreResults.NO;
351  }
352
353  private void closeScannerIfExhausted(boolean exhausted) throws IOException {
354    if (exhausted) {
355      closeScanner();
356    }
357  }
358
359  private void handleScanError(DoNotRetryIOException e,
360    MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
361    // An exception was thrown which makes any partial results that we were collecting
362    // invalid. The scanner will need to be reset to the beginning of a row.
363    scanResultCache.clear();
364
365    // Unfortunately, DNRIOE is used in two different semantics.
366    // (1) The first is to close the client scanner and bubble up the exception all the way
367    // to the application. This is preferred when the exception is really un-recoverable
368    // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
369    // bucket usually.
370    // (2) Second semantics is to close the current region scanner only, but continue the
371    // client scanner by overriding the exception. This is usually UnknownScannerException,
372    // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
373    // application-level ClientScanner has to continue without bubbling up the exception to
374    // the client. See RSRpcServices to see how it throws DNRIOE's.
375    // See also: HBASE-16604, HBASE-17187
376
377    // If exception is any but the list below throw it back to the client; else setup
378    // the scanner and retry.
379    Throwable cause = e.getCause();
380    if (
381      (cause != null && cause instanceof NotServingRegionException)
382        || (cause != null && cause instanceof RegionServerStoppedException)
383        || e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException
384        || e instanceof ScannerResetException || e instanceof LeaseException
385    ) {
386      // Pass. It is easier writing the if loop test as list of what is allowed rather than
387      // as a list of what is not allowed... so if in here, it means we do not throw.
388      if (retriesLeft <= 0) {
389        throw e; // no more retries
390      }
391    } else {
392      throw e;
393    }
394
395    // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
396    if (this.lastResult != null) {
397      // The region has moved. We need to open a brand new scanner at the new location.
398      // Reset the startRow to the row we've seen last so that the new scanner starts at
399      // the correct row. Otherwise we may see previously returned rows again.
400      // If the lastRow is not partial, then we should start from the next row. As now we can
401      // exclude the start row, the logic here is the same for both normal scan and reversed scan.
402      // If lastResult is partial then include it, otherwise exclude it.
403      scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow());
404    }
405    if (e instanceof OutOfOrderScannerNextException) {
406      if (retryAfterOutOfOrderException.isTrue()) {
407        retryAfterOutOfOrderException.setValue(false);
408      } else {
409        // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
410        throw new DoNotRetryIOException(
411          "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e);
412      }
413    }
414    // Clear region.
415    this.currentRegion = null;
416    // Set this to zero so we don't try and do an rpc and close on remote server when
417    // the exception we got was UnknownScanner or the Server is going down.
418    callable = null;
419  }
420
421  /**
422   * Contact the servers to load more {@link Result}s in the cache.
423   */
424  protected void loadCache() throws IOException {
425    // check if scanner was closed during previous prefetch
426    if (closed) {
427      return;
428    }
429    long remainingResultSize = maxScannerResultSize;
430    int countdown = this.caching;
431    // This is possible if we just stopped at the boundary of a region in the previous call.
432    if (callable == null && !moveToNextRegion()) {
433      closed = true;
434      return;
435    }
436    // This flag is set when we want to skip the result returned. We do
437    // this when we reset scanner because it split under us.
438    MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
439    // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should
440    // make sure that we are not retrying indefinitely.
441    int retriesLeft = getRetries();
442    for (;;) {
443      Result[] values;
444      try {
445        // Server returns a null values if scanning is to stop. Else,
446        // returns an empty array if scanning is to go on and we've just
447        // exhausted current region.
448        // now we will also fetch data when openScanner, so do not make a next call again if values
449        // is already non-null.
450        values = call(callable, caller, scannerTimeout, true);
451        // When the replica switch happens, we need to do certain operations again.
452        // The callable will openScanner with the right startkey but we need to pick up
453        // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
454        // of the loop as it happens for the cases where we see exceptions.
455        if (callable.switchedToADifferentReplica()) {
456          // Any accumulated partial results are no longer valid since the callable will
457          // openScanner with the correct startkey and we must pick up from there
458          scanResultCache.clear();
459          this.currentRegion = callable.getHRegionInfo();
460        }
461        retryAfterOutOfOrderException.setValue(true);
462      } catch (DoNotRetryIOException e) {
463        handleScanError(e, retryAfterOutOfOrderException, retriesLeft--);
464        // reopen the scanner
465        if (!moveToNextRegion()) {
466          break;
467        }
468        continue;
469      }
470      long currentTime = EnvironmentEdgeManager.currentTime();
471      if (this.scanMetrics != null) {
472        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
473      }
474      lastNext = currentTime;
475      // Groom the array of Results that we received back from the server before adding that
476      // Results to the scanner's cache. If partial results are not allowed to be seen by the
477      // caller, all book keeping will be performed within this method.
478      int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows();
479      Result[] resultsToAddToCache =
480        scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
481      int numberOfCompleteRows =
482        scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
483      for (Result rs : resultsToAddToCache) {
484        cache.add(rs);
485        long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
486        countdown--;
487        remainingResultSize -= estimatedHeapSizeOfResult;
488        addEstimatedSize(estimatedHeapSizeOfResult);
489        this.lastResult = rs;
490      }
491
492      if (scan.getLimit() > 0) {
493        int newLimit = scan.getLimit() - numberOfCompleteRows;
494        assert newLimit >= 0;
495        scan.setLimit(newLimit);
496      }
497      if (scan.getLimit() == 0 || scanExhausted()) {
498        closeScanner();
499        closed = true;
500        break;
501      }
502      boolean regionExhausted = regionExhausted(values);
503      if (callable.isHeartbeatMessage()) {
504        if (!cache.isEmpty()) {
505          // Caller of this method just wants a Result. If we see a heartbeat message, it means
506          // processing of the scan is taking a long time server side. Rather than continue to
507          // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
508          // unnecesary delays to the caller
509          LOG.trace("Heartbeat message received and cache contains Results. "
510            + "Breaking out of scan loop");
511          // we know that the region has not been exhausted yet so just break without calling
512          // closeScannerIfExhausted
513          break;
514        }
515      }
516      if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) {
517        if (callable.isHeartbeatMessage() && callable.getCursor() != null) {
518          // Use cursor row key from server
519          cache.add(Result.createCursorResult(callable.getCursor()));
520          break;
521        }
522        if (values.length > 0) {
523          // It is size limit exceed and we need return the last Result's row.
524          // When user setBatch and the scanner is reopened, the server may return Results that
525          // user has seen and the last Result can not be seen because the number is not enough.
526          // So the row keys of results may not be same, we must use the last one.
527          cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow())));
528          break;
529        }
530      }
531      if (countdown <= 0) {
532        // we have enough result.
533        closeScannerIfExhausted(regionExhausted);
534        break;
535      }
536      if (remainingResultSize <= 0) {
537        if (!cache.isEmpty()) {
538          closeScannerIfExhausted(regionExhausted);
539          break;
540        } else {
541          // we have reached the max result size but we still can not find anything to return to the
542          // user. Reset the maxResultSize and try again.
543          remainingResultSize = maxScannerResultSize;
544        }
545      }
546      // we are done with the current region
547      if (regionExhausted) {
548        if (!moveToNextRegion()) {
549          closed = true;
550          break;
551        }
552      }
553    }
554  }
555
556  protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
557    return;
558  }
559
560  public int getCacheCount() {
561    return cache != null ? cache.size() : 0;
562  }
563
564  @Override
565  public void close() {
566    try (Scope ignored = span.makeCurrent()) {
567      if (!scanMetricsPublished) {
568        writeScanMetrics();
569      }
570      if (callable != null) {
571        callable.setClose();
572        try {
573          call(callable, caller, scannerTimeout, false);
574        } catch (UnknownScannerException e) {
575          // We used to catch this error, interpret, and rethrow. However, we
576          // have since decided that it's not nice for a scanner's close to
577          // throw exceptions. Chances are it was just due to lease time out.
578          LOG.debug("scanner failed to close", e);
579        } catch (IOException e) {
580          /* An exception other than UnknownScanner is unexpected. */
581          LOG.warn("scanner failed to close.", e);
582          span.recordException(e);
583          span.setStatus(StatusCode.ERROR);
584        }
585        callable = null;
586      }
587      closed = true;
588      span.setStatus(StatusCode.OK);
589    } finally {
590      span.end();
591    }
592  }
593
594  @Override
595  public boolean renewLease() {
596    try (Scope ignored = span.makeCurrent()) {
597      if (callable == null) {
598        return false;
599      }
600      // do not return any rows, do not advance the scanner
601      callable.setRenew(true);
602      try {
603        this.caller.callWithoutRetries(callable, this.scannerTimeout);
604        return true;
605      } catch (Exception e) {
606        LOG.debug("scanner failed to renew lease", e);
607        span.recordException(e);
608        return false;
609      } finally {
610        callable.setRenew(false);
611      }
612    }
613  }
614
615  protected void initCache() {
616    initSyncCache();
617  }
618
619  @Override
620  public Result next() throws IOException {
621    try (Scope ignored = span.makeCurrent()) {
622      return nextWithSyncCache();
623    }
624  }
625}