001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.AbstractList; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.NavigableSet; 027import java.util.Optional; 028import java.util.concurrent.ConcurrentHashMap; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.UnknownScannerException; 037import org.apache.hadoop.hbase.client.ClientInternalHelper; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.RegionInfo; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.filter.FilterWrapper; 042import org.apache.hadoop.hbase.filter.IncompatibleFilterException; 043import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; 044import org.apache.hadoop.hbase.ipc.RpcCall; 045import org.apache.hadoop.hbase.ipc.RpcCallback; 046import org.apache.hadoop.hbase.ipc.RpcServer; 047import org.apache.hadoop.hbase.regionserver.Region.Operation; 048import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 049import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 050import org.apache.hadoop.hbase.trace.TraceUtil; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 057 058/** 059 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). 060 */ 061@InterfaceAudience.Private 062class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { 063 064 private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class); 065 066 // Package local for testability 067 KeyValueHeap storeHeap = null; 068 069 /** 070 * Heap of key-values that are not essential for the provided filters and are thus read on demand, 071 * if on-demand column family loading is enabled. 072 */ 073 KeyValueHeap joinedHeap = null; 074 075 /** 076 * If the joined heap data gathering is interrupted due to scan limits, this will contain the row 077 * for which we are populating the values. 078 */ 079 protected ExtendedCell joinedContinuationRow = null; 080 private boolean filterClosed = false; 081 082 protected final byte[] stopRow; 083 protected final boolean includeStopRow; 084 protected final HRegion region; 085 protected final CellComparator comparator; 086 087 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints; 088 089 private final long readPt; 090 private final long maxResultSize; 091 private final ScannerContext defaultScannerContext; 092 private final FilterWrapper filter; 093 private final String operationId; 094 095 private RegionServerServices rsServices; 096 097 @Override 098 public RegionInfo getRegionInfo() { 099 return region.getRegionInfo(); 100 } 101 102 private static boolean hasNonce(HRegion region, long nonce) { 103 RegionServerServices rsServices = region.getRegionServerServices(); 104 return nonce != HConstants.NO_NONCE && rsServices != null 105 && rsServices.getNonceManager() != null; 106 } 107 108 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, 109 long nonceGroup, long nonce) throws IOException { 110 this.region = region; 111 this.maxResultSize = scan.getMaxResultSize(); 112 if (scan.hasFilter()) { 113 this.filter = new FilterWrapper(scan.getFilter()); 114 } else { 115 this.filter = null; 116 } 117 this.comparator = region.getCellComparator(); 118 /** 119 * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default 120 * scanner context that can be used to enforce the batch limit in the event that a 121 * ScannerContext is not specified during an invocation of next/nextRaw 122 */ 123 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); 124 this.stopRow = scan.getStopRow(); 125 this.includeStopRow = scan.includeStopRow(); 126 this.operationId = scan.getId(); 127 128 // synchronize on scannerReadPoints so that nobody calculates 129 // getSmallestReadPoint, before scannerReadPoints is updated. 130 IsolationLevel isolationLevel = scan.getIsolationLevel(); 131 long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan); 132 this.scannerReadPoints = region.scannerReadPoints; 133 this.rsServices = region.getRegionServerServices(); 134 region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 135 try { 136 if (mvccReadPoint > 0) { 137 this.readPt = mvccReadPoint; 138 } else if (hasNonce(region, nonce)) { 139 this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); 140 } else { 141 this.readPt = region.getReadPoint(isolationLevel); 142 } 143 scannerReadPoints.put(this, this.readPt); 144 } finally { 145 region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); 146 } 147 initializeScanners(scan, additionalScanners); 148 } 149 150 public ScannerContext getContext() { 151 return defaultScannerContext; 152 } 153 154 private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners) 155 throws IOException { 156 // Here we separate all scanners into two lists - scanner that provide data required 157 // by the filter to operate (scanners list) and all others (joinedScanners list). 158 List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size()); 159 List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size()); 160 // Store all already instantiated scanners for exception handling 161 List<KeyValueScanner> instantiatedScanners = new ArrayList<>(); 162 // handle additionalScanners 163 if (additionalScanners != null && !additionalScanners.isEmpty()) { 164 scanners.addAll(additionalScanners); 165 instantiatedScanners.addAll(additionalScanners); 166 } 167 168 try { 169 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { 170 HStore store = region.getStore(entry.getKey()); 171 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); 172 instantiatedScanners.add(scanner); 173 if ( 174 this.filter == null || !scan.doLoadColumnFamiliesOnDemand() 175 || this.filter.isFamilyEssential(entry.getKey()) 176 ) { 177 scanners.add(scanner); 178 } else { 179 joinedScanners.add(scanner); 180 } 181 } 182 initializeKVHeap(scanners, joinedScanners, region); 183 } catch (Throwable t) { 184 throw handleException(instantiatedScanners, t); 185 } 186 } 187 188 protected void initializeKVHeap(List<KeyValueScanner> scanners, 189 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { 190 this.storeHeap = new KeyValueHeap(scanners, comparator); 191 if (!joinedScanners.isEmpty()) { 192 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); 193 } 194 } 195 196 private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { 197 // remove scaner read point before throw the exception 198 scannerReadPoints.remove(this); 199 if (storeHeap != null) { 200 storeHeap.close(); 201 storeHeap = null; 202 if (joinedHeap != null) { 203 joinedHeap.close(); 204 joinedHeap = null; 205 } 206 } else { 207 // close all already instantiated scanners before throwing the exception 208 for (KeyValueScanner scanner : instantiatedScanners) { 209 scanner.close(); 210 } 211 } 212 return t instanceof IOException ? (IOException) t : new IOException(t); 213 } 214 215 @Override 216 public long getMaxResultSize() { 217 return maxResultSize; 218 } 219 220 @Override 221 public long getMvccReadPoint() { 222 return this.readPt; 223 } 224 225 @Override 226 public int getBatch() { 227 return this.defaultScannerContext.getBatchLimit(); 228 } 229 230 @Override 231 public String getOperationId() { 232 return operationId; 233 } 234 235 /** 236 * Reset both the filter and the old filter. 237 * @throws IOException in case a filter raises an I/O exception. 238 */ 239 protected final void resetFilters() throws IOException { 240 if (filter != null) { 241 filter.reset(); 242 } 243 } 244 245 @Override 246 public boolean next(List<? super ExtendedCell> outResults) throws IOException { 247 // apply the batching limit by default 248 return next(outResults, defaultScannerContext); 249 } 250 251 @Override 252 public synchronized boolean next(List<? super ExtendedCell> outResults, 253 ScannerContext scannerContext) throws IOException { 254 if (this.filterClosed) { 255 throw new UnknownScannerException("Scanner was closed (timed out?) " 256 + "after we renewed it. Could be caused by a very slow scanner " 257 + "or a lengthy garbage collection"); 258 } 259 region.startRegionOperation(Operation.SCAN); 260 try { 261 return nextRaw(outResults, scannerContext); 262 } finally { 263 region.closeRegionOperation(Operation.SCAN); 264 } 265 } 266 267 @Override 268 public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException { 269 // Use the RegionScanner's context by default 270 return nextRaw(outResults, defaultScannerContext); 271 } 272 273 @Override 274 public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext) 275 throws IOException { 276 if (storeHeap == null) { 277 // scanner is closed 278 throw new UnknownScannerException("Scanner was closed"); 279 } 280 boolean moreValues = false; 281 if (outResults.isEmpty()) { 282 // Usually outResults is empty. This is true when next is called 283 // to handle scan or get operation. 284 moreValues = nextInternal(outResults, scannerContext); 285 } else { 286 List<ExtendedCell> tmpList = new ArrayList<>(); 287 moreValues = nextInternal(tmpList, scannerContext); 288 outResults.addAll(tmpList); 289 } 290 291 region.addReadRequestsCount(1); 292 if (region.getMetrics() != null) { 293 region.getMetrics().updateReadRequestCount(); 294 } 295 296 // If the size limit was reached it means a partial Result is being returned. Returning a 297 // partial Result means that we should not reset the filters; filters should only be reset in 298 // between rows 299 if (!scannerContext.mayHaveMoreCellsInRow()) { 300 resetFilters(); 301 } 302 303 if (isFilterDoneInternal()) { 304 moreValues = false; 305 } 306 return moreValues; 307 } 308 309 /** Returns true if more cells exist after this batch, false if scanner is done */ 310 private boolean populateFromJoinedHeap(List<? super ExtendedCell> results, 311 ScannerContext scannerContext) throws IOException { 312 assert joinedContinuationRow != null; 313 boolean moreValues = 314 populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow); 315 316 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 317 // We are done with this row, reset the continuation. 318 joinedContinuationRow = null; 319 } 320 // As the data is obtained from two independent heaps, we need to 321 // ensure that result list is sorted, because Result relies on that. 322 ((List<Cell>) results).sort(comparator); 323 return moreValues; 324 } 325 326 /** 327 * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is 328 * reached, or remainingResultSize (if not -1) is reaced 329 * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. 330 * @return state of last call to {@link KeyValueHeap#next()} 331 */ 332 private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap, 333 ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException { 334 Cell nextKv; 335 boolean moreCellsInRow = false; 336 boolean tmpKeepProgress = scannerContext.getKeepProgress(); 337 // Scanning between column families and thus the scope is between cells 338 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 339 do { 340 // Check for thread interrupt status in case we have been signaled from 341 // #interruptRegionOperation. 342 region.checkInterrupt(); 343 344 // We want to maintain any progress that is made towards the limits while scanning across 345 // different column families. To do this, we toggle the keep progress flag on during calls 346 // to the StoreScanner to ensure that any progress made thus far is not wiped away. 347 scannerContext.setKeepProgress(true); 348 heap.next(results, scannerContext); 349 scannerContext.setKeepProgress(tmpKeepProgress); 350 351 nextKv = heap.peek(); 352 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); 353 if (!moreCellsInRow) { 354 incrementCountOfRowsScannedMetric(scannerContext); 355 } 356 if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { 357 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); 358 } else if (scannerContext.checkSizeLimit(limitScope)) { 359 ScannerContext.NextState state = 360 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; 361 return scannerContext.setScannerState(state).hasMoreValues(); 362 } else if (scannerContext.checkTimeLimit(limitScope)) { 363 ScannerContext.NextState state = 364 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; 365 return scannerContext.setScannerState(state).hasMoreValues(); 366 } 367 } while (moreCellsInRow); 368 return nextKv != null; 369 } 370 371 /** 372 * Based on the nextKv in the heap, and the current row, decide whether or not there are more 373 * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then 374 * there are more cells to be read in the row. 375 * @return true When there are more cells in the row to be read 376 */ 377 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) { 378 return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell); 379 } 380 381 /** Returns True if a filter rules the scanner is over, done. */ 382 @Override 383 public synchronized boolean isFilterDone() throws IOException { 384 return isFilterDoneInternal(); 385 } 386 387 private boolean isFilterDoneInternal() throws IOException { 388 return this.filter != null && this.filter.filterAllRemaining(); 389 } 390 391 private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException { 392 if (rpcCall.isPresent()) { 393 // If a user specifies a too-restrictive or too-slow scanner, the 394 // client might time out and disconnect while the server side 395 // is still processing the request. We should abort aggressively 396 // in that case. 397 long afterTime = rpcCall.get().disconnectSince(); 398 if (afterTime >= 0) { 399 throw new CallerDisconnectedException( 400 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this 401 + " after " + afterTime + " ms, since " + "caller disconnected"); 402 } 403 } 404 } 405 406 private void resetProgress(ScannerContext scannerContext, int initialBatchProgress, 407 long initialSizeProgress, long initialHeapSizeProgress) { 408 // Starting to scan a new row. Reset the scanner progress according to whether or not 409 // progress should be kept. 410 if (scannerContext.getKeepProgress()) { 411 // Progress should be kept. Reset to initial values seen at start of method invocation. 412 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 413 initialHeapSizeProgress); 414 } else { 415 scannerContext.clearProgress(); 416 } 417 } 418 419 private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext) 420 throws IOException { 421 Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list"); 422 Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null"); 423 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 424 425 // Save the initial progress from the Scanner context in these local variables. The progress 426 // may need to be reset a few times if rows are being filtered out so we save the initial 427 // progress. 428 int initialBatchProgress = scannerContext.getBatchProgress(); 429 long initialSizeProgress = scannerContext.getDataSizeProgress(); 430 long initialHeapSizeProgress = scannerContext.getHeapSizeProgress(); 431 432 // Used to check time limit 433 LimitScope limitScope = LimitScope.BETWEEN_CELLS; 434 435 // The loop here is used only when at some point during the next we determine 436 // that due to effects of filters or otherwise, we have an empty row in the result. 437 // Then we loop and try again. Otherwise, we must get out on the first iteration via return, 438 // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, 439 // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). 440 while (true) { 441 resetProgress(scannerContext, initialBatchProgress, initialSizeProgress, 442 initialHeapSizeProgress); 443 checkClientDisconnect(rpcCall); 444 445 // Check for thread interrupt status in case we have been signaled from 446 // #interruptRegionOperation. 447 region.checkInterrupt(); 448 449 // Let's see what we have in the storeHeap. 450 ExtendedCell current = this.storeHeap.peek(); 451 452 boolean shouldStop = shouldStop(current); 453 // When has filter row is true it means that the all the cells for a particular row must be 454 // read before a filtering decision can be made. This means that filters where hasFilterRow 455 // run the risk of enLongAddering out of memory errors in the case that they are applied to a 456 // table that has very large rows. 457 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); 458 459 // If filter#hasFilterRow is true, partial results are not allowed since allowing them 460 // would prevent the filters from being evaluated. Thus, if it is true, change the 461 // scope of any limits that could potentially create partial results to 462 // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row 463 if (hasFilterRow) { 464 if (LOG.isTraceEnabled()) { 465 LOG.trace("filter#hasFilterRow is true which prevents partial results from being " 466 + " formed. Changing scope of limits that may create partials"); 467 } 468 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); 469 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS); 470 limitScope = LimitScope.BETWEEN_ROWS; 471 } 472 473 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 474 if (hasFilterRow) { 475 throw new IncompatibleFilterException( 476 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 477 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 478 } 479 return true; 480 } 481 482 // Check if we were getting data from the joinedHeap and hit the limit. 483 // If not, then it's main path - getting results from storeHeap. 484 if (joinedContinuationRow == null) { 485 // First, check if we are at a stop row. If so, there are no more results. 486 if (shouldStop) { 487 if (hasFilterRow) { 488 filter.filterRowCells((List<Cell>) results); 489 } 490 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 491 } 492 493 // Check if rowkey filter wants to exclude this row. If so, loop to next. 494 // Technically, if we hit limits before on this row, we don't need this call. 495 if (filterRowKey(current)) { 496 incrementCountOfRowsFilteredMetric(scannerContext); 497 // early check, see HBASE-16296 498 if (isFilterDoneInternal()) { 499 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 500 } 501 // Typically the count of rows scanned is incremented inside #populateResult. However, 502 // here we are filtering a row based purely on its row key, preventing us from calling 503 // #populateResult. Thus, perform the necessary increment here to rows scanned metric 504 incrementCountOfRowsScannedMetric(scannerContext); 505 boolean moreRows = nextRow(scannerContext, current); 506 if (!moreRows) { 507 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 508 } 509 results.clear(); 510 511 // Read nothing as the rowkey was filtered, but still need to check time limit 512 // We also check size limit because we might have read blocks in getting to this point. 513 if (scannerContext.checkAnyLimitReached(limitScope)) { 514 return true; 515 } 516 continue; 517 } 518 519 // Ok, we are good, let's try to get some results from the main heap. 520 populateResult(results, this.storeHeap, scannerContext, current); 521 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 522 if (hasFilterRow) { 523 throw new IncompatibleFilterException( 524 "Filter whose hasFilterRow() returns true is incompatible with scans that must " 525 + " stop mid-row because of a limit. ScannerContext:" + scannerContext); 526 } 527 return true; 528 } 529 530 // Check for thread interrupt status in case we have been signaled from 531 // #interruptRegionOperation. 532 region.checkInterrupt(); 533 534 Cell nextKv = this.storeHeap.peek(); 535 shouldStop = shouldStop(nextKv); 536 // save that the row was empty before filters applied to it. 537 final boolean isEmptyRow = results.isEmpty(); 538 539 // We have the part of the row necessary for filtering (all of it, usually). 540 // First filter with the filterRow(List). 541 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; 542 if (hasFilterRow) { 543 ret = filter.filterRowCellsWithRet((List<Cell>) results); 544 545 // We don't know how the results have changed after being filtered. Must set progress 546 // according to contents of results now. 547 if (scannerContext.getKeepProgress()) { 548 scannerContext.setProgress(initialBatchProgress, initialSizeProgress, 549 initialHeapSizeProgress); 550 } else { 551 scannerContext.clearProgress(); 552 } 553 scannerContext.incrementBatchProgress(results.size()); 554 for (ExtendedCell cell : (List<ExtendedCell>) results) { 555 scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 556 cell.heapSize()); 557 } 558 } 559 560 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { 561 incrementCountOfRowsFilteredMetric(scannerContext); 562 results.clear(); 563 boolean moreRows = nextRow(scannerContext, current); 564 if (!moreRows) { 565 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 566 } 567 568 // This row was totally filtered out, if this is NOT the last row, 569 // we should continue on. Otherwise, nothing else to do. 570 if (!shouldStop) { 571 // Read nothing as the cells was filtered, but still need to check time limit. 572 // We also check size limit because we might have read blocks in getting to this point. 573 if (scannerContext.checkAnyLimitReached(limitScope)) { 574 return true; 575 } 576 continue; 577 } 578 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 579 } 580 581 // Ok, we are done with storeHeap for this row. 582 // Now we may need to fetch additional, non-essential data into row. 583 // These values are not needed for filter to work, so we postpone their 584 // fetch to (possibly) reduce amount of data loads from disk. 585 if (this.joinedHeap != null) { 586 boolean mayHaveData = joinedHeapMayHaveData(current); 587 if (mayHaveData) { 588 joinedContinuationRow = current; 589 populateFromJoinedHeap(results, scannerContext); 590 591 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 592 return true; 593 } 594 } 595 } 596 } else { 597 // Populating from the joined heap was stopped by limits, populate some more. 598 populateFromJoinedHeap(results, scannerContext); 599 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { 600 return true; 601 } 602 } 603 // We may have just called populateFromJoinedMap and hit the limits. If that is 604 // the case, we need to call it again on the next next() invocation. 605 if (joinedContinuationRow != null) { 606 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 607 } 608 609 // Finally, we are done with both joinedHeap and storeHeap. 610 // Double check to prevent empty rows from appearing in result. It could be 611 // the case when SingleColumnValueExcludeFilter is used. 612 if (results.isEmpty()) { 613 incrementCountOfRowsFilteredMetric(scannerContext); 614 boolean moreRows = nextRow(scannerContext, current); 615 if (!moreRows) { 616 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 617 } 618 if (!shouldStop) { 619 // We check size limit because we might have read blocks in the nextRow call above, or 620 // in the call populateResults call. Only scans with hasFilterRow should reach this point, 621 // and for those scans which filter row _cells_ this is the only place we can actually 622 // enforce that the scan does not exceed limits since it bypasses all other checks above. 623 if (scannerContext.checkSizeLimit(limitScope)) { 624 return true; 625 } 626 continue; 627 } 628 } 629 630 if (shouldStop) { 631 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 632 } else { 633 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 634 } 635 } 636 } 637 638 private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { 639 region.filteredReadRequestsCount.increment(); 640 if (region.getMetrics() != null) { 641 region.getMetrics().updateFilteredRecords(); 642 } 643 644 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 645 return; 646 } 647 648 scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); 649 } 650 651 private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { 652 if (scannerContext == null || !scannerContext.isTrackingMetrics()) { 653 return; 654 } 655 656 scannerContext.getMetrics().countOfRowsScanned.incrementAndGet(); 657 } 658 659 /** Returns true when the joined heap may have data for the current row */ 660 private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException { 661 Cell nextJoinedKv = joinedHeap.peek(); 662 boolean matchCurrentRow = 663 nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell); 664 boolean matchAfterSeek = false; 665 666 // If the next value in the joined heap does not match the current row, try to seek to the 667 // correct row 668 if (!matchCurrentRow) { 669 ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell); 670 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true); 671 matchAfterSeek = seekSuccessful && joinedHeap.peek() != null 672 && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell); 673 } 674 675 return matchCurrentRow || matchAfterSeek; 676 } 677 678 /** 679 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both 680 * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may 681 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true 682 * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the 683 * filterRow() will be skipped. 684 */ 685 private boolean filterRow() throws IOException { 686 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside 687 // filterRowCells(List<Cell> kvs) so we skip that scenario here. 688 return filter != null && (!filter.hasFilterRow()) && filter.filterRow(); 689 } 690 691 private boolean filterRowKey(Cell current) throws IOException { 692 return filter != null && filter.filterRowKey(current); 693 } 694 695 /** 696 * A mocked list implementation - discards all updates. 697 */ 698 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() { 699 700 @Override 701 public void add(int index, Cell element) { 702 // do nothing 703 } 704 705 @Override 706 public boolean addAll(int index, Collection<? extends Cell> c) { 707 return false; // this list is never changed as a result of an update 708 } 709 710 @Override 711 public KeyValue get(int index) { 712 throw new UnsupportedOperationException(); 713 } 714 715 @Override 716 public int size() { 717 return 0; 718 } 719 }; 720 721 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { 722 assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; 723 724 // Enable skipping row mode, which disables limits and skips tracking progress for all 725 // but block size. We keep tracking block size because skipping a row in this way 726 // might involve reading blocks along the way. 727 scannerContext.setSkippingRow(true); 728 729 Cell next; 730 while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { 731 // Check for thread interrupt status in case we have been signaled from 732 // #interruptRegionOperation. 733 region.checkInterrupt(); 734 this.storeHeap.next(MOCKED_LIST, scannerContext); 735 } 736 737 scannerContext.setSkippingRow(false); 738 resetFilters(); 739 740 // Calling the hook in CP which allows it to do a fast forward 741 return this.region.getCoprocessorHost() == null 742 || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell); 743 } 744 745 protected boolean shouldStop(Cell currentRowCell) { 746 if (currentRowCell == null) { 747 return true; 748 } 749 if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) { 750 return false; 751 } 752 int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length); 753 return c > 0 || (c == 0 && !includeStopRow); 754 } 755 756 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 757 justification = "this method is only called inside close which is synchronized") 758 private void closeInternal() { 759 if (storeHeap != null) { 760 storeHeap.close(); 761 storeHeap = null; 762 } 763 if (joinedHeap != null) { 764 joinedHeap.close(); 765 joinedHeap = null; 766 } 767 // no need to synchronize here. 768 scannerReadPoints.remove(this); 769 this.filterClosed = true; 770 } 771 772 @Override 773 public synchronized void close() { 774 TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); 775 } 776 777 @Override 778 public synchronized boolean reseek(byte[] row) throws IOException { 779 return TraceUtil.trace(() -> { 780 if (row == null) { 781 throw new IllegalArgumentException("Row cannot be null."); 782 } 783 boolean result = false; 784 region.startRegionOperation(); 785 ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); 786 try { 787 // use request seek to make use of the lazy seek option. See HBASE-5520 788 result = this.storeHeap.requestSeek(kv, true, true); 789 if (this.joinedHeap != null) { 790 result = this.joinedHeap.requestSeek(kv, true, true) || result; 791 } 792 } finally { 793 region.closeRegionOperation(); 794 } 795 return result; 796 }, () -> region.createRegionSpan("RegionScanner.reseek")); 797 } 798 799 @Override 800 public void shipped() throws IOException { 801 if (storeHeap != null) { 802 storeHeap.shipped(); 803 } 804 if (joinedHeap != null) { 805 joinedHeap.shipped(); 806 } 807 } 808 809 @Override 810 public void run() throws IOException { 811 // This is the RPC callback method executed. We do the close in of the scanner in this 812 // callback 813 this.close(); 814 } 815}