001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.UnknownScannerException;
037import org.apache.hadoop.hbase.client.ClientInternalHelper;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.filter.FilterWrapper;
042import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
043import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
044import org.apache.hadoop.hbase.ipc.RpcCall;
045import org.apache.hadoop.hbase.ipc.RpcCallback;
046import org.apache.hadoop.hbase.ipc.RpcServer;
047import org.apache.hadoop.hbase.regionserver.Region.Operation;
048import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
049import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
050import org.apache.hadoop.hbase.trace.TraceUtil;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
057
058/**
059 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
060 */
061@InterfaceAudience.Private
062class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
063
064  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
065
066  // Package local for testability
067  KeyValueHeap storeHeap = null;
068
069  /**
070   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
071   * if on-demand column family loading is enabled.
072   */
073  KeyValueHeap joinedHeap = null;
074
075  /**
076   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
077   * for which we are populating the values.
078   */
079  protected ExtendedCell joinedContinuationRow = null;
080  private boolean filterClosed = false;
081
082  protected final byte[] stopRow;
083  protected final boolean includeStopRow;
084  protected final HRegion region;
085  protected final CellComparator comparator;
086
087  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
088
089  private final long readPt;
090  private final long maxResultSize;
091  private final ScannerContext defaultScannerContext;
092  private final FilterWrapper filter;
093  private final String operationId;
094
095  private RegionServerServices rsServices;
096
097  @Override
098  public RegionInfo getRegionInfo() {
099    return region.getRegionInfo();
100  }
101
102  private static boolean hasNonce(HRegion region, long nonce) {
103    RegionServerServices rsServices = region.getRegionServerServices();
104    return nonce != HConstants.NO_NONCE && rsServices != null
105      && rsServices.getNonceManager() != null;
106  }
107
108  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
109    long nonceGroup, long nonce) throws IOException {
110    this.region = region;
111    this.maxResultSize = scan.getMaxResultSize();
112    if (scan.hasFilter()) {
113      this.filter = new FilterWrapper(scan.getFilter());
114    } else {
115      this.filter = null;
116    }
117    this.comparator = region.getCellComparator();
118    /**
119     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
120     * scanner context that can be used to enforce the batch limit in the event that a
121     * ScannerContext is not specified during an invocation of next/nextRaw
122     */
123    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
124    this.stopRow = scan.getStopRow();
125    this.includeStopRow = scan.includeStopRow();
126    this.operationId = scan.getId();
127
128    // synchronize on scannerReadPoints so that nobody calculates
129    // getSmallestReadPoint, before scannerReadPoints is updated.
130    IsolationLevel isolationLevel = scan.getIsolationLevel();
131    long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan);
132    this.scannerReadPoints = region.scannerReadPoints;
133    this.rsServices = region.getRegionServerServices();
134    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
135    try {
136      if (mvccReadPoint > 0) {
137        this.readPt = mvccReadPoint;
138      } else if (hasNonce(region, nonce)) {
139        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
140      } else {
141        this.readPt = region.getReadPoint(isolationLevel);
142      }
143      scannerReadPoints.put(this, this.readPt);
144    } finally {
145      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
146    }
147    initializeScanners(scan, additionalScanners);
148  }
149
150  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
151    throws IOException {
152    // Here we separate all scanners into two lists - scanner that provide data required
153    // by the filter to operate (scanners list) and all others (joinedScanners list).
154    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
155    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
156    // Store all already instantiated scanners for exception handling
157    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
158    // handle additionalScanners
159    if (additionalScanners != null && !additionalScanners.isEmpty()) {
160      scanners.addAll(additionalScanners);
161      instantiatedScanners.addAll(additionalScanners);
162    }
163
164    try {
165      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
166        HStore store = region.getStore(entry.getKey());
167        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
168        instantiatedScanners.add(scanner);
169        if (
170          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
171            || this.filter.isFamilyEssential(entry.getKey())
172        ) {
173          scanners.add(scanner);
174        } else {
175          joinedScanners.add(scanner);
176        }
177      }
178      initializeKVHeap(scanners, joinedScanners, region);
179    } catch (Throwable t) {
180      throw handleException(instantiatedScanners, t);
181    }
182  }
183
184  protected void initializeKVHeap(List<KeyValueScanner> scanners,
185    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
186    this.storeHeap = new KeyValueHeap(scanners, comparator);
187    if (!joinedScanners.isEmpty()) {
188      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
189    }
190  }
191
192  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
193    // remove scaner read point before throw the exception
194    scannerReadPoints.remove(this);
195    if (storeHeap != null) {
196      storeHeap.close();
197      storeHeap = null;
198      if (joinedHeap != null) {
199        joinedHeap.close();
200        joinedHeap = null;
201      }
202    } else {
203      // close all already instantiated scanners before throwing the exception
204      for (KeyValueScanner scanner : instantiatedScanners) {
205        scanner.close();
206      }
207    }
208    return t instanceof IOException ? (IOException) t : new IOException(t);
209  }
210
211  @Override
212  public long getMaxResultSize() {
213    return maxResultSize;
214  }
215
216  @Override
217  public long getMvccReadPoint() {
218    return this.readPt;
219  }
220
221  @Override
222  public int getBatch() {
223    return this.defaultScannerContext.getBatchLimit();
224  }
225
226  @Override
227  public String getOperationId() {
228    return operationId;
229  }
230
231  /**
232   * Reset both the filter and the old filter.
233   * @throws IOException in case a filter raises an I/O exception.
234   */
235  protected final void resetFilters() throws IOException {
236    if (filter != null) {
237      filter.reset();
238    }
239  }
240
241  @Override
242  public boolean next(List<? super ExtendedCell> outResults) throws IOException {
243    // apply the batching limit by default
244    return next(outResults, defaultScannerContext);
245  }
246
247  @Override
248  public synchronized boolean next(List<? super ExtendedCell> outResults,
249    ScannerContext scannerContext) throws IOException {
250    if (this.filterClosed) {
251      throw new UnknownScannerException("Scanner was closed (timed out?) "
252        + "after we renewed it. Could be caused by a very slow scanner "
253        + "or a lengthy garbage collection");
254    }
255    region.startRegionOperation(Operation.SCAN);
256    try {
257      return nextRaw(outResults, scannerContext);
258    } finally {
259      region.closeRegionOperation(Operation.SCAN);
260    }
261  }
262
263  @Override
264  public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException {
265    // Use the RegionScanner's context by default
266    return nextRaw(outResults, defaultScannerContext);
267  }
268
269  @Override
270  public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext)
271    throws IOException {
272    if (storeHeap == null) {
273      // scanner is closed
274      throw new UnknownScannerException("Scanner was closed");
275    }
276    boolean moreValues = false;
277    if (outResults.isEmpty()) {
278      // Usually outResults is empty. This is true when next is called
279      // to handle scan or get operation.
280      moreValues = nextInternal(outResults, scannerContext);
281    } else {
282      List<ExtendedCell> tmpList = new ArrayList<>();
283      moreValues = nextInternal(tmpList, scannerContext);
284      outResults.addAll(tmpList);
285    }
286
287    region.addReadRequestsCount(1);
288    if (region.getMetrics() != null) {
289      region.getMetrics().updateReadRequestCount();
290    }
291
292    // If the size limit was reached it means a partial Result is being returned. Returning a
293    // partial Result means that we should not reset the filters; filters should only be reset in
294    // between rows
295    if (!scannerContext.mayHaveMoreCellsInRow()) {
296      resetFilters();
297    }
298
299    if (isFilterDoneInternal()) {
300      moreValues = false;
301    }
302    return moreValues;
303  }
304
305  /** Returns true if more cells exist after this batch, false if scanner is done */
306  private boolean populateFromJoinedHeap(List<? super ExtendedCell> results,
307    ScannerContext scannerContext) throws IOException {
308    assert joinedContinuationRow != null;
309    boolean moreValues =
310      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
311
312    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
313      // We are done with this row, reset the continuation.
314      joinedContinuationRow = null;
315    }
316    // As the data is obtained from two independent heaps, we need to
317    // ensure that result list is sorted, because Result relies on that.
318    ((List<Cell>) results).sort(comparator);
319    return moreValues;
320  }
321
322  /**
323   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
324   * reached, or remainingResultSize (if not -1) is reaced
325   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
326   * @return state of last call to {@link KeyValueHeap#next()}
327   */
328  private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap,
329    ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException {
330    Cell nextKv;
331    boolean moreCellsInRow = false;
332    boolean tmpKeepProgress = scannerContext.getKeepProgress();
333    // Scanning between column families and thus the scope is between cells
334    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
335    do {
336      // Check for thread interrupt status in case we have been signaled from
337      // #interruptRegionOperation.
338      region.checkInterrupt();
339
340      // We want to maintain any progress that is made towards the limits while scanning across
341      // different column families. To do this, we toggle the keep progress flag on during calls
342      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
343      scannerContext.setKeepProgress(true);
344      heap.next(results, scannerContext);
345      scannerContext.setKeepProgress(tmpKeepProgress);
346
347      nextKv = heap.peek();
348      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
349      if (!moreCellsInRow) {
350        incrementCountOfRowsScannedMetric(scannerContext);
351      }
352      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
353        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
354      } else if (scannerContext.checkSizeLimit(limitScope)) {
355        ScannerContext.NextState state =
356          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
357        return scannerContext.setScannerState(state).hasMoreValues();
358      } else if (scannerContext.checkTimeLimit(limitScope)) {
359        ScannerContext.NextState state =
360          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
361        return scannerContext.setScannerState(state).hasMoreValues();
362      }
363    } while (moreCellsInRow);
364    return nextKv != null;
365  }
366
367  /**
368   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
369   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
370   * there are more cells to be read in the row.
371   * @return true When there are more cells in the row to be read
372   */
373  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
374    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
375  }
376
377  /** Returns True if a filter rules the scanner is over, done. */
378  @Override
379  public synchronized boolean isFilterDone() throws IOException {
380    return isFilterDoneInternal();
381  }
382
383  private boolean isFilterDoneInternal() throws IOException {
384    return this.filter != null && this.filter.filterAllRemaining();
385  }
386
387  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
388    if (rpcCall.isPresent()) {
389      // If a user specifies a too-restrictive or too-slow scanner, the
390      // client might time out and disconnect while the server side
391      // is still processing the request. We should abort aggressively
392      // in that case.
393      long afterTime = rpcCall.get().disconnectSince();
394      if (afterTime >= 0) {
395        throw new CallerDisconnectedException(
396          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
397            + " after " + afterTime + " ms, since " + "caller disconnected");
398      }
399    }
400  }
401
402  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
403    long initialSizeProgress, long initialHeapSizeProgress) {
404    // Starting to scan a new row. Reset the scanner progress according to whether or not
405    // progress should be kept.
406    if (scannerContext.getKeepProgress()) {
407      // Progress should be kept. Reset to initial values seen at start of method invocation.
408      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
409        initialHeapSizeProgress);
410    } else {
411      scannerContext.clearProgress();
412    }
413  }
414
415  private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext)
416    throws IOException {
417    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
418    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
419    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
420
421    // Save the initial progress from the Scanner context in these local variables. The progress
422    // may need to be reset a few times if rows are being filtered out so we save the initial
423    // progress.
424    int initialBatchProgress = scannerContext.getBatchProgress();
425    long initialSizeProgress = scannerContext.getDataSizeProgress();
426    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
427
428    // Used to check time limit
429    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
430
431    // The loop here is used only when at some point during the next we determine
432    // that due to effects of filters or otherwise, we have an empty row in the result.
433    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
434    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
435    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
436    while (true) {
437      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
438        initialHeapSizeProgress);
439      checkClientDisconnect(rpcCall);
440
441      // Check for thread interrupt status in case we have been signaled from
442      // #interruptRegionOperation.
443      region.checkInterrupt();
444
445      // Let's see what we have in the storeHeap.
446      ExtendedCell current = this.storeHeap.peek();
447
448      boolean shouldStop = shouldStop(current);
449      // When has filter row is true it means that the all the cells for a particular row must be
450      // read before a filtering decision can be made. This means that filters where hasFilterRow
451      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
452      // table that has very large rows.
453      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
454
455      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
456      // would prevent the filters from being evaluated. Thus, if it is true, change the
457      // scope of any limits that could potentially create partial results to
458      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
459      if (hasFilterRow) {
460        if (LOG.isTraceEnabled()) {
461          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
462            + " formed. Changing scope of limits that may create partials");
463        }
464        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
465        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
466        limitScope = LimitScope.BETWEEN_ROWS;
467      }
468
469      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
470        if (hasFilterRow) {
471          throw new IncompatibleFilterException(
472            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
473              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
474        }
475        return true;
476      }
477
478      // Check if we were getting data from the joinedHeap and hit the limit.
479      // If not, then it's main path - getting results from storeHeap.
480      if (joinedContinuationRow == null) {
481        // First, check if we are at a stop row. If so, there are no more results.
482        if (shouldStop) {
483          if (hasFilterRow) {
484            filter.filterRowCells((List<Cell>) results);
485          }
486          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
487        }
488
489        // Check if rowkey filter wants to exclude this row. If so, loop to next.
490        // Technically, if we hit limits before on this row, we don't need this call.
491        if (filterRowKey(current)) {
492          incrementCountOfRowsFilteredMetric(scannerContext);
493          // early check, see HBASE-16296
494          if (isFilterDoneInternal()) {
495            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
496          }
497          // Typically the count of rows scanned is incremented inside #populateResult. However,
498          // here we are filtering a row based purely on its row key, preventing us from calling
499          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
500          incrementCountOfRowsScannedMetric(scannerContext);
501          boolean moreRows = nextRow(scannerContext, current);
502          if (!moreRows) {
503            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
504          }
505          results.clear();
506
507          // Read nothing as the rowkey was filtered, but still need to check time limit
508          // We also check size limit because we might have read blocks in getting to this point.
509          if (scannerContext.checkAnyLimitReached(limitScope)) {
510            return true;
511          }
512          continue;
513        }
514
515        // Ok, we are good, let's try to get some results from the main heap.
516        populateResult(results, this.storeHeap, scannerContext, current);
517        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
518          if (hasFilterRow) {
519            throw new IncompatibleFilterException(
520              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
521                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
522          }
523          return true;
524        }
525
526        // Check for thread interrupt status in case we have been signaled from
527        // #interruptRegionOperation.
528        region.checkInterrupt();
529
530        Cell nextKv = this.storeHeap.peek();
531        shouldStop = shouldStop(nextKv);
532        // save that the row was empty before filters applied to it.
533        final boolean isEmptyRow = results.isEmpty();
534
535        // We have the part of the row necessary for filtering (all of it, usually).
536        // First filter with the filterRow(List).
537        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
538        if (hasFilterRow) {
539          ret = filter.filterRowCellsWithRet((List<Cell>) results);
540
541          // We don't know how the results have changed after being filtered. Must set progress
542          // according to contents of results now.
543          if (scannerContext.getKeepProgress()) {
544            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
545              initialHeapSizeProgress);
546          } else {
547            scannerContext.clearProgress();
548          }
549          scannerContext.incrementBatchProgress(results.size());
550          for (ExtendedCell cell : (List<ExtendedCell>) results) {
551            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
552              cell.heapSize());
553          }
554        }
555
556        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
557          incrementCountOfRowsFilteredMetric(scannerContext);
558          results.clear();
559          boolean moreRows = nextRow(scannerContext, current);
560          if (!moreRows) {
561            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
562          }
563
564          // This row was totally filtered out, if this is NOT the last row,
565          // we should continue on. Otherwise, nothing else to do.
566          if (!shouldStop) {
567            // Read nothing as the cells was filtered, but still need to check time limit.
568            // We also check size limit because we might have read blocks in getting to this point.
569            if (scannerContext.checkAnyLimitReached(limitScope)) {
570              return true;
571            }
572            continue;
573          }
574          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
575        }
576
577        // Ok, we are done with storeHeap for this row.
578        // Now we may need to fetch additional, non-essential data into row.
579        // These values are not needed for filter to work, so we postpone their
580        // fetch to (possibly) reduce amount of data loads from disk.
581        if (this.joinedHeap != null) {
582          boolean mayHaveData = joinedHeapMayHaveData(current);
583          if (mayHaveData) {
584            joinedContinuationRow = current;
585            populateFromJoinedHeap(results, scannerContext);
586
587            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
588              return true;
589            }
590          }
591        }
592      } else {
593        // Populating from the joined heap was stopped by limits, populate some more.
594        populateFromJoinedHeap(results, scannerContext);
595        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
596          return true;
597        }
598      }
599      // We may have just called populateFromJoinedMap and hit the limits. If that is
600      // the case, we need to call it again on the next next() invocation.
601      if (joinedContinuationRow != null) {
602        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
603      }
604
605      // Finally, we are done with both joinedHeap and storeHeap.
606      // Double check to prevent empty rows from appearing in result. It could be
607      // the case when SingleColumnValueExcludeFilter is used.
608      if (results.isEmpty()) {
609        incrementCountOfRowsFilteredMetric(scannerContext);
610        boolean moreRows = nextRow(scannerContext, current);
611        if (!moreRows) {
612          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
613        }
614        if (!shouldStop) {
615          // We check size limit because we might have read blocks in the nextRow call above, or
616          // in the call populateResults call. Only scans with hasFilterRow should reach this point,
617          // and for those scans which filter row _cells_ this is the only place we can actually
618          // enforce that the scan does not exceed limits since it bypasses all other checks above.
619          if (scannerContext.checkSizeLimit(limitScope)) {
620            return true;
621          }
622          continue;
623        }
624      }
625
626      if (shouldStop) {
627        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
628      } else {
629        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
630      }
631    }
632  }
633
634  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
635    region.filteredReadRequestsCount.increment();
636    if (region.getMetrics() != null) {
637      region.getMetrics().updateFilteredRecords();
638    }
639
640    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
641      return;
642    }
643
644    scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
645  }
646
647  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
648    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
649      return;
650    }
651
652    scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
653  }
654
655  /** Returns true when the joined heap may have data for the current row */
656  private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException {
657    Cell nextJoinedKv = joinedHeap.peek();
658    boolean matchCurrentRow =
659      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
660    boolean matchAfterSeek = false;
661
662    // If the next value in the joined heap does not match the current row, try to seek to the
663    // correct row
664    if (!matchCurrentRow) {
665      ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
666      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
667      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
668        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
669    }
670
671    return matchCurrentRow || matchAfterSeek;
672  }
673
674  /**
675   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
676   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
677   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
678   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
679   * filterRow() will be skipped.
680   */
681  private boolean filterRow() throws IOException {
682    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
683    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
684    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
685  }
686
687  private boolean filterRowKey(Cell current) throws IOException {
688    return filter != null && filter.filterRowKey(current);
689  }
690
691  /**
692   * A mocked list implementation - discards all updates.
693   */
694  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
695
696    @Override
697    public void add(int index, Cell element) {
698      // do nothing
699    }
700
701    @Override
702    public boolean addAll(int index, Collection<? extends Cell> c) {
703      return false; // this list is never changed as a result of an update
704    }
705
706    @Override
707    public KeyValue get(int index) {
708      throw new UnsupportedOperationException();
709    }
710
711    @Override
712    public int size() {
713      return 0;
714    }
715  };
716
717  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
718    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
719
720    // Enable skipping row mode, which disables limits and skips tracking progress for all
721    // but block size. We keep tracking block size because skipping a row in this way
722    // might involve reading blocks along the way.
723    scannerContext.setSkippingRow(true);
724
725    Cell next;
726    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
727      // Check for thread interrupt status in case we have been signaled from
728      // #interruptRegionOperation.
729      region.checkInterrupt();
730      this.storeHeap.next(MOCKED_LIST, scannerContext);
731    }
732
733    scannerContext.setSkippingRow(false);
734    resetFilters();
735
736    // Calling the hook in CP which allows it to do a fast forward
737    return this.region.getCoprocessorHost() == null
738      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
739  }
740
741  protected boolean shouldStop(Cell currentRowCell) {
742    if (currentRowCell == null) {
743      return true;
744    }
745    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
746      return false;
747    }
748    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
749    return c > 0 || (c == 0 && !includeStopRow);
750  }
751
752  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
753      justification = "this method is only called inside close which is synchronized")
754  private void closeInternal() {
755    if (storeHeap != null) {
756      storeHeap.close();
757      storeHeap = null;
758    }
759    if (joinedHeap != null) {
760      joinedHeap.close();
761      joinedHeap = null;
762    }
763    // no need to synchronize here.
764    scannerReadPoints.remove(this);
765    this.filterClosed = true;
766  }
767
768  @Override
769  public synchronized void close() {
770    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
771  }
772
773  @Override
774  public synchronized boolean reseek(byte[] row) throws IOException {
775    return TraceUtil.trace(() -> {
776      if (row == null) {
777        throw new IllegalArgumentException("Row cannot be null.");
778      }
779      boolean result = false;
780      region.startRegionOperation();
781      ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
782      try {
783        // use request seek to make use of the lazy seek option. See HBASE-5520
784        result = this.storeHeap.requestSeek(kv, true, true);
785        if (this.joinedHeap != null) {
786          result = this.joinedHeap.requestSeek(kv, true, true) || result;
787        }
788      } finally {
789        region.closeRegionOperation();
790      }
791      return result;
792    }, () -> region.createRegionSpan("RegionScanner.reseek"));
793  }
794
795  @Override
796  public void shipped() throws IOException {
797    if (storeHeap != null) {
798      storeHeap.shipped();
799    }
800    if (joinedHeap != null) {
801      joinedHeap.shipped();
802    }
803  }
804
805  @Override
806  public void run() throws IOException {
807    // This is the RPC callback method executed. We do the close in of the scanner in this
808    // callback
809    this.close();
810  }
811}