001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 023 024import io.opentelemetry.api.trace.Span; 025import io.opentelemetry.api.trace.StatusCode; 026import io.opentelemetry.context.Scope; 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.util.ArrayDeque; 030import java.util.Map; 031import java.util.Queue; 032import java.util.concurrent.ExecutorService; 033import org.apache.commons.lang3.mutable.MutableBoolean; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.DoNotRetryIOException; 036import org.apache.hadoop.hbase.HRegionInfo; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.UnknownScannerException; 040import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 041import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 042import org.apache.hadoop.hbase.exceptions.ScannerResetException; 043import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 044import org.apache.hadoop.hbase.regionserver.LeaseException; 045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053 054/** 055 * Implements the scanner interface for the HBase client. If there are multiple regions in a table, 056 * this scanner will iterate through them all. 057 */ 058@InterfaceAudience.Private 059public abstract class ClientScanner extends AbstractClientScanner { 060 061 private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class); 062 063 protected final Scan scan; 064 // We clone the original client Scan to avoid modifying user object from scan internals. 065 // The below scanForMetrics is the client's object, which we mutate only for returning 066 // ScanMetrics. 067 // See https://issues.apache.org/jira/browse/HBASE-27402. 068 private final Scan scanForMetrics; 069 070 protected boolean closed = false; 071 // Current region scanner is against. Gets cleared if current region goes 072 // wonky: e.g. if it splits on us. 073 protected HRegionInfo currentRegion = null; 074 protected ScannerCallableWithReplicas callable = null; 075 protected Queue<Result> cache; 076 private final ScanResultCache scanResultCache; 077 protected final int caching; 078 protected long lastNext; 079 // Keep lastResult returned successfully in case we have to reset scanner. 080 protected Result lastResult = null; 081 protected final long maxScannerResultSize; 082 private final ClusterConnection connection; 083 protected final TableName tableName; 084 protected final int readRpcTimeout; 085 protected final int scannerTimeout; 086 private final boolean useScannerTimeoutForNextCalls; 087 protected boolean scanMetricsPublished = false; 088 protected RpcRetryingCaller<Result[]> caller; 089 protected RpcControllerFactory rpcControllerFactory; 090 protected Configuration conf; 091 protected final Span span; 092 // The timeout on the primary. Applicable if there are multiple replicas for a region 093 // In that case, we will only wait for this much timeout on the primary before going 094 // to the replicas and trying the same scan. Note that the retries will still happen 095 // on each replica and the first successful results will be taken. A timeout of 0 is 096 // disallowed. 097 protected final int primaryOperationTimeout; 098 private int retries; 099 protected final ExecutorService pool; 100 protected final Map<String, byte[]> requestAttributes; 101 102 /** 103 * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start 104 * row maybe changed changed. 105 * @param conf The {@link Configuration} to use. 106 * @param scan {@link Scan} to use in this scanner 107 * @param tableName The table that we wish to scan 108 * @param connection Connection identifying the cluster 109 */ 110 public ClientScanner(final Configuration conf, final Scan scan, final Scan scanForMetrics, 111 final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, 112 RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, 113 int scannerTimeout, int primaryOperationTimeout, 114 ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes) 115 throws IOException { 116 this.scanForMetrics = scanForMetrics; 117 if (LOG.isTraceEnabled()) { 118 LOG.trace( 119 "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); 120 } 121 this.scan = scan; 122 this.tableName = tableName; 123 this.lastNext = EnvironmentEdgeManager.currentTime(); 124 this.connection = connection; 125 this.pool = pool; 126 this.primaryOperationTimeout = primaryOperationTimeout; 127 this.retries = connectionConfiguration.getRetriesNumber(); 128 if (scan.getMaxResultSize() > 0) { 129 this.maxScannerResultSize = scan.getMaxResultSize(); 130 } else { 131 this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize(); 132 } 133 this.readRpcTimeout = scanReadRpcTimeout; 134 this.scannerTimeout = scannerTimeout; 135 this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls(); 136 this.requestAttributes = requestAttributes; 137 138 // check if application wants to collect scan metrics 139 initScanMetrics(scan); 140 141 // Use the caching from the Scan. If not set, use the default cache setting for this table. 142 if (this.scan.getCaching() > 0) { 143 this.caching = this.scan.getCaching(); 144 } else { 145 this.caching = connectionConfiguration.getScannerCaching(); 146 } 147 148 this.caller = rpcFactory.<Result[]> newCaller(); 149 this.rpcControllerFactory = controllerFactory; 150 151 this.conf = conf; 152 this.span = Span.current(); 153 154 this.scanResultCache = createScanResultCache(scan); 155 initCache(); 156 } 157 158 protected final int getScanReplicaId() { 159 return Math.max(scan.getReplicaId(), RegionReplicaUtil.DEFAULT_REPLICA_ID); 160 } 161 162 protected ClusterConnection getConnection() { 163 return this.connection; 164 } 165 166 protected TableName getTable() { 167 return this.tableName; 168 } 169 170 protected int getRetries() { 171 return this.retries; 172 } 173 174 protected int getScannerTimeout() { 175 return this.scannerTimeout; 176 } 177 178 protected Configuration getConf() { 179 return this.conf; 180 } 181 182 protected Scan getScan() { 183 return scan; 184 } 185 186 protected ExecutorService getPool() { 187 return pool; 188 } 189 190 protected int getPrimaryOperationTimeout() { 191 return primaryOperationTimeout; 192 } 193 194 protected int getCaching() { 195 return caching; 196 } 197 198 protected long getTimestamp() { 199 return lastNext; 200 } 201 202 protected long getMaxResultSize() { 203 return maxScannerResultSize; 204 } 205 206 private void closeScanner() throws IOException { 207 if (this.callable != null) { 208 this.callable.setClose(); 209 call(callable, caller, scannerTimeout, false); 210 this.callable = null; 211 } 212 } 213 214 /** 215 * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal 216 * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we 217 * will start next scan from the startKey of the currentRegion. 218 * @return {@code false} if we have reached the stop row. Otherwise {@code true}. 219 */ 220 protected abstract boolean setNewStartKey(); 221 222 /** 223 * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed 224 * scan we need to create a ReversedScannerCallable. 225 */ 226 protected abstract ScannerCallable createScannerCallable(); 227 228 /** 229 * Close the previous scanner and create a new ScannerCallable for the next scanner. 230 * <p> 231 * Marked as protected only because TestClientScanner need to override this method. 232 * @return false if we should terminate the scan. Otherwise 233 */ 234 protected boolean moveToNextRegion() { 235 // Close the previous scanner if it's open 236 try { 237 closeScanner(); 238 } catch (IOException e) { 239 // not a big deal continue 240 if (LOG.isDebugEnabled()) { 241 LOG.debug("close scanner for " + currentRegion + " failed", e); 242 } 243 } 244 if (currentRegion != null) { 245 if (!setNewStartKey()) { 246 return false; 247 } 248 scan.resetMvccReadPoint(); 249 if (LOG.isTraceEnabled()) { 250 LOG.trace("Finished " + this.currentRegion); 251 } 252 } 253 if (LOG.isDebugEnabled() && this.currentRegion != null) { 254 // Only worth logging if NOT first region in scan. 255 LOG.debug( 256 "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) 257 + "', " + (scan.includeStartRow() ? "inclusive" : "exclusive")); 258 } 259 // clear the current region, we will set a new value to it after the first call of the new 260 // callable. 261 this.currentRegion = null; 262 this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(), 263 createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout, 264 scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller); 265 this.callable.setCaching(this.caching); 266 incRegionCountMetrics(scanMetrics); 267 return true; 268 } 269 270 boolean isAnyRPCcancelled() { 271 return callable.isAnyRPCcancelled(); 272 } 273 274 private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, 275 int scannerTimeout, boolean updateCurrentRegion) throws IOException { 276 if (Thread.interrupted()) { 277 throw new InterruptedIOException(); 278 } 279 // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, 280 // we do a callWithRetries 281 Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout); 282 if (currentRegion == null && updateCurrentRegion) { 283 currentRegion = callable.getHRegionInfo(); 284 } 285 return rrs; 286 } 287 288 /** 289 * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the 290 * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics 291 * framework because it doesn't support multi-instances of the same metrics on the same machine; 292 * for scan/map reduce scenarios, we will have multiple scans running at the same time. By 293 * default, scan metrics are disabled; if the application wants to collect them, this behavior can 294 * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} 295 */ 296 protected void writeScanMetrics() { 297 if (this.scanMetrics == null || scanMetricsPublished) { 298 return; 299 } 300 // Publish ScanMetrics to the Scan Object. 301 // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not 302 // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published 303 // to Scan will be messed up. 304 scanForMetrics.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, 305 ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); 306 scanMetricsPublished = true; 307 } 308 309 protected void initSyncCache() { 310 cache = new ArrayDeque<>(); 311 } 312 313 protected Result nextWithSyncCache() throws IOException { 314 Result result = cache.poll(); 315 if (result != null) { 316 return result; 317 } 318 // If there is nothing left in the cache and the scanner is closed, 319 // return a no-op 320 if (this.closed) { 321 return null; 322 } 323 324 loadCache(); 325 326 // try again to load from cache 327 result = cache.poll(); 328 329 // if we exhausted this scanner before calling close, write out the scan metrics 330 if (result == null) { 331 writeScanMetrics(); 332 } 333 return result; 334 } 335 336 public int getCacheSize() { 337 return cache != null ? cache.size() : 0; 338 } 339 340 private boolean scanExhausted() { 341 return callable.moreResultsForScan() == MoreResults.NO; 342 } 343 344 private boolean regionExhausted(Result[] values) { 345 // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the 346 // old time we always return empty result for a open scanner operation so we add a check here to 347 // keep compatible with the old logic. Should remove the isOpenScanner in the future. 348 // 2. Server tells us that it has no more results for this region. 349 return (values.length == 0 && !callable.isHeartbeatMessage()) 350 || callable.moreResultsInRegion() == MoreResults.NO; 351 } 352 353 private void closeScannerIfExhausted(boolean exhausted) throws IOException { 354 if (exhausted) { 355 closeScanner(); 356 } 357 } 358 359 private void handleScanError(DoNotRetryIOException e, 360 MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { 361 // An exception was thrown which makes any partial results that we were collecting 362 // invalid. The scanner will need to be reset to the beginning of a row. 363 scanResultCache.clear(); 364 365 // Unfortunately, DNRIOE is used in two different semantics. 366 // (1) The first is to close the client scanner and bubble up the exception all the way 367 // to the application. This is preferred when the exception is really un-recoverable 368 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 369 // bucket usually. 370 // (2) Second semantics is to close the current region scanner only, but continue the 371 // client scanner by overriding the exception. This is usually UnknownScannerException, 372 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 373 // application-level ClientScanner has to continue without bubbling up the exception to 374 // the client. See RSRpcServices to see how it throws DNRIOE's. 375 // See also: HBASE-16604, HBASE-17187 376 377 // If exception is any but the list below throw it back to the client; else setup 378 // the scanner and retry. 379 Throwable cause = e.getCause(); 380 if ( 381 (cause != null && cause instanceof NotServingRegionException) 382 || (cause != null && cause instanceof RegionServerStoppedException) 383 || e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException 384 || e instanceof ScannerResetException || e instanceof LeaseException 385 ) { 386 // Pass. It is easier writing the if loop test as list of what is allowed rather than 387 // as a list of what is not allowed... so if in here, it means we do not throw. 388 if (retriesLeft <= 0) { 389 throw e; // no more retries 390 } 391 } else { 392 throw e; 393 } 394 395 // Else, its signal from depths of ScannerCallable that we need to reset the scanner. 396 if (this.lastResult != null) { 397 // The region has moved. We need to open a brand new scanner at the new location. 398 // Reset the startRow to the row we've seen last so that the new scanner starts at 399 // the correct row. Otherwise we may see previously returned rows again. 400 // If the lastRow is not partial, then we should start from the next row. As now we can 401 // exclude the start row, the logic here is the same for both normal scan and reversed scan. 402 // If lastResult is partial then include it, otherwise exclude it. 403 scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow()); 404 } 405 if (e instanceof OutOfOrderScannerNextException) { 406 if (retryAfterOutOfOrderException.isTrue()) { 407 retryAfterOutOfOrderException.setValue(false); 408 } else { 409 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? 410 throw new DoNotRetryIOException( 411 "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); 412 } 413 } 414 // Clear region. 415 this.currentRegion = null; 416 // Set this to zero so we don't try and do an rpc and close on remote server when 417 // the exception we got was UnknownScanner or the Server is going down. 418 callable = null; 419 } 420 421 /** 422 * Contact the servers to load more {@link Result}s in the cache. 423 */ 424 protected void loadCache() throws IOException { 425 // check if scanner was closed during previous prefetch 426 if (closed) { 427 return; 428 } 429 long remainingResultSize = maxScannerResultSize; 430 int countdown = this.caching; 431 // This is possible if we just stopped at the boundary of a region in the previous call. 432 if (callable == null && !moveToNextRegion()) { 433 closed = true; 434 return; 435 } 436 // This flag is set when we want to skip the result returned. We do 437 // this when we reset scanner because it split under us. 438 MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); 439 // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should 440 // make sure that we are not retrying indefinitely. 441 int retriesLeft = getRetries(); 442 for (;;) { 443 Result[] values; 444 try { 445 // Server returns a null values if scanning is to stop. Else, 446 // returns an empty array if scanning is to go on and we've just 447 // exhausted current region. 448 // now we will also fetch data when openScanner, so do not make a next call again if values 449 // is already non-null. 450 values = call(callable, caller, scannerTimeout, true); 451 // When the replica switch happens, we need to do certain operations again. 452 // The callable will openScanner with the right startkey but we need to pick up 453 // from there. Bypass the rest of the loop and let the catch-up happen in the beginning 454 // of the loop as it happens for the cases where we see exceptions. 455 if (callable.switchedToADifferentReplica()) { 456 // Any accumulated partial results are no longer valid since the callable will 457 // openScanner with the correct startkey and we must pick up from there 458 scanResultCache.clear(); 459 this.currentRegion = callable.getHRegionInfo(); 460 } 461 retryAfterOutOfOrderException.setValue(true); 462 } catch (DoNotRetryIOException e) { 463 handleScanError(e, retryAfterOutOfOrderException, retriesLeft--); 464 // reopen the scanner 465 if (!moveToNextRegion()) { 466 break; 467 } 468 continue; 469 } 470 long currentTime = EnvironmentEdgeManager.currentTime(); 471 if (this.scanMetrics != null) { 472 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); 473 } 474 lastNext = currentTime; 475 // Groom the array of Results that we received back from the server before adding that 476 // Results to the scanner's cache. If partial results are not allowed to be seen by the 477 // caller, all book keeping will be performed within this method. 478 int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows(); 479 Result[] resultsToAddToCache = 480 scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); 481 int numberOfCompleteRows = 482 scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 483 for (Result rs : resultsToAddToCache) { 484 cache.add(rs); 485 long estimatedHeapSizeOfResult = calcEstimatedSize(rs); 486 countdown--; 487 remainingResultSize -= estimatedHeapSizeOfResult; 488 addEstimatedSize(estimatedHeapSizeOfResult); 489 this.lastResult = rs; 490 } 491 492 if (scan.getLimit() > 0) { 493 int newLimit = scan.getLimit() - numberOfCompleteRows; 494 assert newLimit >= 0; 495 scan.setLimit(newLimit); 496 } 497 if (scan.getLimit() == 0 || scanExhausted()) { 498 closeScanner(); 499 closed = true; 500 break; 501 } 502 boolean regionExhausted = regionExhausted(values); 503 if (callable.isHeartbeatMessage()) { 504 if (!cache.isEmpty()) { 505 // Caller of this method just wants a Result. If we see a heartbeat message, it means 506 // processing of the scan is taking a long time server side. Rather than continue to 507 // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing 508 // unnecesary delays to the caller 509 LOG.trace("Heartbeat message received and cache contains Results. " 510 + "Breaking out of scan loop"); 511 // we know that the region has not been exhausted yet so just break without calling 512 // closeScannerIfExhausted 513 break; 514 } 515 } 516 if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) { 517 if (callable.isHeartbeatMessage() && callable.getCursor() != null) { 518 // Use cursor row key from server 519 cache.add(Result.createCursorResult(callable.getCursor())); 520 break; 521 } 522 if (values.length > 0) { 523 // It is size limit exceed and we need return the last Result's row. 524 // When user setBatch and the scanner is reopened, the server may return Results that 525 // user has seen and the last Result can not be seen because the number is not enough. 526 // So the row keys of results may not be same, we must use the last one. 527 cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow()))); 528 break; 529 } 530 } 531 if (countdown <= 0) { 532 // we have enough result. 533 closeScannerIfExhausted(regionExhausted); 534 break; 535 } 536 if (remainingResultSize <= 0) { 537 if (!cache.isEmpty()) { 538 closeScannerIfExhausted(regionExhausted); 539 break; 540 } else { 541 // we have reached the max result size but we still can not find anything to return to the 542 // user. Reset the maxResultSize and try again. 543 remainingResultSize = maxScannerResultSize; 544 } 545 } 546 // we are done with the current region 547 if (regionExhausted) { 548 if (!moveToNextRegion()) { 549 closed = true; 550 break; 551 } 552 } 553 } 554 } 555 556 protected void addEstimatedSize(long estimatedHeapSizeOfResult) { 557 return; 558 } 559 560 public int getCacheCount() { 561 return cache != null ? cache.size() : 0; 562 } 563 564 @Override 565 public void close() { 566 try (Scope ignored = span.makeCurrent()) { 567 if (!scanMetricsPublished) { 568 writeScanMetrics(); 569 } 570 if (callable != null) { 571 callable.setClose(); 572 try { 573 call(callable, caller, scannerTimeout, false); 574 } catch (UnknownScannerException e) { 575 // We used to catch this error, interpret, and rethrow. However, we 576 // have since decided that it's not nice for a scanner's close to 577 // throw exceptions. Chances are it was just due to lease time out. 578 LOG.debug("scanner failed to close", e); 579 } catch (IOException e) { 580 /* An exception other than UnknownScanner is unexpected. */ 581 LOG.warn("scanner failed to close.", e); 582 span.recordException(e); 583 span.setStatus(StatusCode.ERROR); 584 } 585 callable = null; 586 } 587 closed = true; 588 span.setStatus(StatusCode.OK); 589 } finally { 590 span.end(); 591 } 592 } 593 594 @Override 595 public boolean renewLease() { 596 try (Scope ignored = span.makeCurrent()) { 597 if (callable == null) { 598 return false; 599 } 600 // do not return any rows, do not advance the scanner 601 callable.setRenew(true); 602 try { 603 this.caller.callWithoutRetries(callable, this.scannerTimeout); 604 return true; 605 } catch (Exception e) { 606 LOG.debug("scanner failed to renew lease", e); 607 span.recordException(e); 608 return false; 609 } finally { 610 callable.setRenew(false); 611 } 612 } 613 } 614 615 protected void initCache() { 616 initSyncCache(); 617 } 618 619 @Override 620 public Result next() throws IOException { 621 try (Scope ignored = span.makeCurrent()) { 622 return nextWithSyncCache(); 623 } 624 } 625}