001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.AbstractList;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.NavigableSet;
027import java.util.Optional;
028import java.util.concurrent.ConcurrentHashMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.UnknownScannerException;
037import org.apache.hadoop.hbase.client.ClientInternalHelper;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.filter.FilterWrapper;
042import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
043import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
044import org.apache.hadoop.hbase.ipc.RpcCall;
045import org.apache.hadoop.hbase.ipc.RpcCallback;
046import org.apache.hadoop.hbase.ipc.RpcServer;
047import org.apache.hadoop.hbase.regionserver.Region.Operation;
048import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
049import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
050import org.apache.hadoop.hbase.trace.TraceUtil;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
057
058/**
059 * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
060 */
061@InterfaceAudience.Private
062class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
063
064  private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
065
066  // Package local for testability
067  KeyValueHeap storeHeap = null;
068
069  /**
070   * Heap of key-values that are not essential for the provided filters and are thus read on demand,
071   * if on-demand column family loading is enabled.
072   */
073  KeyValueHeap joinedHeap = null;
074
075  /**
076   * If the joined heap data gathering is interrupted due to scan limits, this will contain the row
077   * for which we are populating the values.
078   */
079  protected ExtendedCell joinedContinuationRow = null;
080  private boolean filterClosed = false;
081
082  protected final byte[] stopRow;
083  protected final boolean includeStopRow;
084  protected final HRegion region;
085  protected final CellComparator comparator;
086
087  private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
088
089  private final long readPt;
090  private final long maxResultSize;
091  private final ScannerContext defaultScannerContext;
092  private final FilterWrapper filter;
093  private final String operationId;
094
095  private RegionServerServices rsServices;
096
097  @Override
098  public RegionInfo getRegionInfo() {
099    return region.getRegionInfo();
100  }
101
102  private static boolean hasNonce(HRegion region, long nonce) {
103    RegionServerServices rsServices = region.getRegionServerServices();
104    return nonce != HConstants.NO_NONCE && rsServices != null
105      && rsServices.getNonceManager() != null;
106  }
107
108  RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
109    long nonceGroup, long nonce) throws IOException {
110    this.region = region;
111    this.maxResultSize = scan.getMaxResultSize();
112    if (scan.hasFilter()) {
113      this.filter = new FilterWrapper(scan.getFilter());
114    } else {
115      this.filter = null;
116    }
117    this.comparator = region.getCellComparator();
118    /**
119     * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
120     * scanner context that can be used to enforce the batch limit in the event that a
121     * ScannerContext is not specified during an invocation of next/nextRaw
122     */
123    defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
124    this.stopRow = scan.getStopRow();
125    this.includeStopRow = scan.includeStopRow();
126    this.operationId = scan.getId();
127
128    // synchronize on scannerReadPoints so that nobody calculates
129    // getSmallestReadPoint, before scannerReadPoints is updated.
130    IsolationLevel isolationLevel = scan.getIsolationLevel();
131    long mvccReadPoint = ClientInternalHelper.getMvccReadPoint(scan);
132    this.scannerReadPoints = region.scannerReadPoints;
133    this.rsServices = region.getRegionServerServices();
134    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
135    try {
136      if (mvccReadPoint > 0) {
137        this.readPt = mvccReadPoint;
138      } else if (hasNonce(region, nonce)) {
139        this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
140      } else {
141        this.readPt = region.getReadPoint(isolationLevel);
142      }
143      scannerReadPoints.put(this, this.readPt);
144    } finally {
145      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
146    }
147    initializeScanners(scan, additionalScanners);
148  }
149
150  public ScannerContext getContext() {
151    return defaultScannerContext;
152  }
153
154  private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
155    throws IOException {
156    // Here we separate all scanners into two lists - scanner that provide data required
157    // by the filter to operate (scanners list) and all others (joinedScanners list).
158    List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
159    List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
160    // Store all already instantiated scanners for exception handling
161    List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
162    // handle additionalScanners
163    if (additionalScanners != null && !additionalScanners.isEmpty()) {
164      scanners.addAll(additionalScanners);
165      instantiatedScanners.addAll(additionalScanners);
166    }
167
168    try {
169      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
170        HStore store = region.getStore(entry.getKey());
171        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
172        instantiatedScanners.add(scanner);
173        if (
174          this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
175            || this.filter.isFamilyEssential(entry.getKey())
176        ) {
177          scanners.add(scanner);
178        } else {
179          joinedScanners.add(scanner);
180        }
181      }
182      initializeKVHeap(scanners, joinedScanners, region);
183    } catch (Throwable t) {
184      throw handleException(instantiatedScanners, t);
185    }
186  }
187
188  protected void initializeKVHeap(List<KeyValueScanner> scanners,
189    List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
190    this.storeHeap = new KeyValueHeap(scanners, comparator);
191    if (!joinedScanners.isEmpty()) {
192      this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
193    }
194  }
195
196  private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
197    // remove scaner read point before throw the exception
198    scannerReadPoints.remove(this);
199    if (storeHeap != null) {
200      storeHeap.close();
201      storeHeap = null;
202      if (joinedHeap != null) {
203        joinedHeap.close();
204        joinedHeap = null;
205      }
206    } else {
207      // close all already instantiated scanners before throwing the exception
208      for (KeyValueScanner scanner : instantiatedScanners) {
209        scanner.close();
210      }
211    }
212    return t instanceof IOException ? (IOException) t : new IOException(t);
213  }
214
215  @Override
216  public long getMaxResultSize() {
217    return maxResultSize;
218  }
219
220  @Override
221  public long getMvccReadPoint() {
222    return this.readPt;
223  }
224
225  @Override
226  public int getBatch() {
227    return this.defaultScannerContext.getBatchLimit();
228  }
229
230  @Override
231  public String getOperationId() {
232    return operationId;
233  }
234
235  /**
236   * Reset both the filter and the old filter.
237   * @throws IOException in case a filter raises an I/O exception.
238   */
239  protected final void resetFilters() throws IOException {
240    if (filter != null) {
241      filter.reset();
242    }
243  }
244
245  @Override
246  public boolean next(List<? super ExtendedCell> outResults) throws IOException {
247    // apply the batching limit by default
248    return next(outResults, defaultScannerContext);
249  }
250
251  @Override
252  public synchronized boolean next(List<? super ExtendedCell> outResults,
253    ScannerContext scannerContext) throws IOException {
254    if (this.filterClosed) {
255      throw new UnknownScannerException("Scanner was closed (timed out?) "
256        + "after we renewed it. Could be caused by a very slow scanner "
257        + "or a lengthy garbage collection");
258    }
259    region.startRegionOperation(Operation.SCAN);
260    try {
261      return nextRaw(outResults, scannerContext);
262    } finally {
263      region.closeRegionOperation(Operation.SCAN);
264    }
265  }
266
267  @Override
268  public boolean nextRaw(List<? super ExtendedCell> outResults) throws IOException {
269    // Use the RegionScanner's context by default
270    return nextRaw(outResults, defaultScannerContext);
271  }
272
273  @Override
274  public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext scannerContext)
275    throws IOException {
276    if (storeHeap == null) {
277      // scanner is closed
278      throw new UnknownScannerException("Scanner was closed");
279    }
280    boolean moreValues = false;
281    if (outResults.isEmpty()) {
282      // Usually outResults is empty. This is true when next is called
283      // to handle scan or get operation.
284      moreValues = nextInternal(outResults, scannerContext);
285    } else {
286      List<ExtendedCell> tmpList = new ArrayList<>();
287      moreValues = nextInternal(tmpList, scannerContext);
288      outResults.addAll(tmpList);
289    }
290
291    region.addReadRequestsCount(1);
292    if (region.getMetrics() != null) {
293      region.getMetrics().updateReadRequestCount();
294    }
295
296    // If the size limit was reached it means a partial Result is being returned. Returning a
297    // partial Result means that we should not reset the filters; filters should only be reset in
298    // between rows
299    if (!scannerContext.mayHaveMoreCellsInRow()) {
300      resetFilters();
301    }
302
303    if (isFilterDoneInternal()) {
304      moreValues = false;
305    }
306    return moreValues;
307  }
308
309  /** Returns true if more cells exist after this batch, false if scanner is done */
310  private boolean populateFromJoinedHeap(List<? super ExtendedCell> results,
311    ScannerContext scannerContext) throws IOException {
312    assert joinedContinuationRow != null;
313    boolean moreValues =
314      populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
315
316    if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
317      // We are done with this row, reset the continuation.
318      joinedContinuationRow = null;
319    }
320    // As the data is obtained from two independent heaps, we need to
321    // ensure that result list is sorted, because Result relies on that.
322    ((List<Cell>) results).sort(comparator);
323    return moreValues;
324  }
325
326  /**
327   * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
328   * reached, or remainingResultSize (if not -1) is reaced
329   * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
330   * @return state of last call to {@link KeyValueHeap#next()}
331   */
332  private boolean populateResult(List<? super ExtendedCell> results, KeyValueHeap heap,
333    ScannerContext scannerContext, ExtendedCell currentRowCell) throws IOException {
334    Cell nextKv;
335    boolean moreCellsInRow = false;
336    boolean tmpKeepProgress = scannerContext.getKeepProgress();
337    // Scanning between column families and thus the scope is between cells
338    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
339    do {
340      // Check for thread interrupt status in case we have been signaled from
341      // #interruptRegionOperation.
342      region.checkInterrupt();
343
344      // We want to maintain any progress that is made towards the limits while scanning across
345      // different column families. To do this, we toggle the keep progress flag on during calls
346      // to the StoreScanner to ensure that any progress made thus far is not wiped away.
347      scannerContext.setKeepProgress(true);
348      heap.next(results, scannerContext);
349      scannerContext.setKeepProgress(tmpKeepProgress);
350
351      nextKv = heap.peek();
352      moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
353      if (!moreCellsInRow) {
354        incrementCountOfRowsScannedMetric(scannerContext);
355      }
356      if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
357        return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
358      } else if (scannerContext.checkSizeLimit(limitScope)) {
359        ScannerContext.NextState state =
360          moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
361        return scannerContext.setScannerState(state).hasMoreValues();
362      } else if (scannerContext.checkTimeLimit(limitScope)) {
363        ScannerContext.NextState state =
364          moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
365        return scannerContext.setScannerState(state).hasMoreValues();
366      }
367    } while (moreCellsInRow);
368    return nextKv != null;
369  }
370
371  /**
372   * Based on the nextKv in the heap, and the current row, decide whether or not there are more
373   * cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
374   * there are more cells to be read in the row.
375   * @return true When there are more cells in the row to be read
376   */
377  private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
378    return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
379  }
380
381  /** Returns True if a filter rules the scanner is over, done. */
382  @Override
383  public synchronized boolean isFilterDone() throws IOException {
384    return isFilterDoneInternal();
385  }
386
387  private boolean isFilterDoneInternal() throws IOException {
388    return this.filter != null && this.filter.filterAllRemaining();
389  }
390
391  private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
392    if (rpcCall.isPresent()) {
393      // If a user specifies a too-restrictive or too-slow scanner, the
394      // client might time out and disconnect while the server side
395      // is still processing the request. We should abort aggressively
396      // in that case.
397      long afterTime = rpcCall.get().disconnectSince();
398      if (afterTime >= 0) {
399        throw new CallerDisconnectedException(
400          "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this
401            + " after " + afterTime + " ms, since " + "caller disconnected");
402      }
403    }
404  }
405
406  private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
407    long initialSizeProgress, long initialHeapSizeProgress) {
408    // Starting to scan a new row. Reset the scanner progress according to whether or not
409    // progress should be kept.
410    if (scannerContext.getKeepProgress()) {
411      // Progress should be kept. Reset to initial values seen at start of method invocation.
412      scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
413        initialHeapSizeProgress);
414    } else {
415      scannerContext.clearProgress();
416    }
417  }
418
419  private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext)
420    throws IOException {
421    Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
422    Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
423    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
424
425    // Save the initial progress from the Scanner context in these local variables. The progress
426    // may need to be reset a few times if rows are being filtered out so we save the initial
427    // progress.
428    int initialBatchProgress = scannerContext.getBatchProgress();
429    long initialSizeProgress = scannerContext.getDataSizeProgress();
430    long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
431
432    // Used to check time limit
433    LimitScope limitScope = LimitScope.BETWEEN_CELLS;
434
435    // The loop here is used only when at some point during the next we determine
436    // that due to effects of filters or otherwise, we have an empty row in the result.
437    // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
438    // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
439    // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
440    while (true) {
441      resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
442        initialHeapSizeProgress);
443      checkClientDisconnect(rpcCall);
444
445      // Check for thread interrupt status in case we have been signaled from
446      // #interruptRegionOperation.
447      region.checkInterrupt();
448
449      // Let's see what we have in the storeHeap.
450      ExtendedCell current = this.storeHeap.peek();
451
452      boolean shouldStop = shouldStop(current);
453      // When has filter row is true it means that the all the cells for a particular row must be
454      // read before a filtering decision can be made. This means that filters where hasFilterRow
455      // run the risk of enLongAddering out of memory errors in the case that they are applied to a
456      // table that has very large rows.
457      boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
458
459      // If filter#hasFilterRow is true, partial results are not allowed since allowing them
460      // would prevent the filters from being evaluated. Thus, if it is true, change the
461      // scope of any limits that could potentially create partial results to
462      // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
463      if (hasFilterRow) {
464        if (LOG.isTraceEnabled()) {
465          LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
466            + " formed. Changing scope of limits that may create partials");
467        }
468        scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
469        scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
470        limitScope = LimitScope.BETWEEN_ROWS;
471      }
472
473      if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
474        if (hasFilterRow) {
475          throw new IncompatibleFilterException(
476            "Filter whose hasFilterRow() returns true is incompatible with scans that must "
477              + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
478        }
479        return true;
480      }
481
482      // Check if we were getting data from the joinedHeap and hit the limit.
483      // If not, then it's main path - getting results from storeHeap.
484      if (joinedContinuationRow == null) {
485        // First, check if we are at a stop row. If so, there are no more results.
486        if (shouldStop) {
487          if (hasFilterRow) {
488            filter.filterRowCells((List<Cell>) results);
489          }
490          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
491        }
492
493        // Check if rowkey filter wants to exclude this row. If so, loop to next.
494        // Technically, if we hit limits before on this row, we don't need this call.
495        if (filterRowKey(current)) {
496          incrementCountOfRowsFilteredMetric(scannerContext);
497          // early check, see HBASE-16296
498          if (isFilterDoneInternal()) {
499            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
500          }
501          // Typically the count of rows scanned is incremented inside #populateResult. However,
502          // here we are filtering a row based purely on its row key, preventing us from calling
503          // #populateResult. Thus, perform the necessary increment here to rows scanned metric
504          incrementCountOfRowsScannedMetric(scannerContext);
505          boolean moreRows = nextRow(scannerContext, current);
506          if (!moreRows) {
507            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
508          }
509          results.clear();
510
511          // Read nothing as the rowkey was filtered, but still need to check time limit
512          // We also check size limit because we might have read blocks in getting to this point.
513          if (scannerContext.checkAnyLimitReached(limitScope)) {
514            return true;
515          }
516          continue;
517        }
518
519        // Ok, we are good, let's try to get some results from the main heap.
520        populateResult(results, this.storeHeap, scannerContext, current);
521        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
522          if (hasFilterRow) {
523            throw new IncompatibleFilterException(
524              "Filter whose hasFilterRow() returns true is incompatible with scans that must "
525                + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
526          }
527          return true;
528        }
529
530        // Check for thread interrupt status in case we have been signaled from
531        // #interruptRegionOperation.
532        region.checkInterrupt();
533
534        Cell nextKv = this.storeHeap.peek();
535        shouldStop = shouldStop(nextKv);
536        // save that the row was empty before filters applied to it.
537        final boolean isEmptyRow = results.isEmpty();
538
539        // We have the part of the row necessary for filtering (all of it, usually).
540        // First filter with the filterRow(List).
541        FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
542        if (hasFilterRow) {
543          ret = filter.filterRowCellsWithRet((List<Cell>) results);
544
545          // We don't know how the results have changed after being filtered. Must set progress
546          // according to contents of results now.
547          if (scannerContext.getKeepProgress()) {
548            scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
549              initialHeapSizeProgress);
550          } else {
551            scannerContext.clearProgress();
552          }
553          scannerContext.incrementBatchProgress(results.size());
554          for (ExtendedCell cell : (List<ExtendedCell>) results) {
555            scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
556              cell.heapSize());
557          }
558        }
559
560        if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
561          incrementCountOfRowsFilteredMetric(scannerContext);
562          results.clear();
563          boolean moreRows = nextRow(scannerContext, current);
564          if (!moreRows) {
565            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
566          }
567
568          // This row was totally filtered out, if this is NOT the last row,
569          // we should continue on. Otherwise, nothing else to do.
570          if (!shouldStop) {
571            // Read nothing as the cells was filtered, but still need to check time limit.
572            // We also check size limit because we might have read blocks in getting to this point.
573            if (scannerContext.checkAnyLimitReached(limitScope)) {
574              return true;
575            }
576            continue;
577          }
578          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
579        }
580
581        // Ok, we are done with storeHeap for this row.
582        // Now we may need to fetch additional, non-essential data into row.
583        // These values are not needed for filter to work, so we postpone their
584        // fetch to (possibly) reduce amount of data loads from disk.
585        if (this.joinedHeap != null) {
586          boolean mayHaveData = joinedHeapMayHaveData(current);
587          if (mayHaveData) {
588            joinedContinuationRow = current;
589            populateFromJoinedHeap(results, scannerContext);
590
591            if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
592              return true;
593            }
594          }
595        }
596      } else {
597        // Populating from the joined heap was stopped by limits, populate some more.
598        populateFromJoinedHeap(results, scannerContext);
599        if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
600          return true;
601        }
602      }
603      // We may have just called populateFromJoinedMap and hit the limits. If that is
604      // the case, we need to call it again on the next next() invocation.
605      if (joinedContinuationRow != null) {
606        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
607      }
608
609      // Finally, we are done with both joinedHeap and storeHeap.
610      // Double check to prevent empty rows from appearing in result. It could be
611      // the case when SingleColumnValueExcludeFilter is used.
612      if (results.isEmpty()) {
613        incrementCountOfRowsFilteredMetric(scannerContext);
614        boolean moreRows = nextRow(scannerContext, current);
615        if (!moreRows) {
616          return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
617        }
618        if (!shouldStop) {
619          // We check size limit because we might have read blocks in the nextRow call above, or
620          // in the call populateResults call. Only scans with hasFilterRow should reach this point,
621          // and for those scans which filter row _cells_ this is the only place we can actually
622          // enforce that the scan does not exceed limits since it bypasses all other checks above.
623          if (scannerContext.checkSizeLimit(limitScope)) {
624            return true;
625          }
626          continue;
627        }
628      }
629
630      if (shouldStop) {
631        return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
632      } else {
633        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
634      }
635    }
636  }
637
638  private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
639    region.filteredReadRequestsCount.increment();
640    if (region.getMetrics() != null) {
641      region.getMetrics().updateFilteredRecords();
642    }
643
644    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
645      return;
646    }
647
648    scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
649  }
650
651  private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
652    if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
653      return;
654    }
655
656    scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
657  }
658
659  /** Returns true when the joined heap may have data for the current row */
660  private boolean joinedHeapMayHaveData(ExtendedCell currentRowCell) throws IOException {
661    Cell nextJoinedKv = joinedHeap.peek();
662    boolean matchCurrentRow =
663      nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
664    boolean matchAfterSeek = false;
665
666    // If the next value in the joined heap does not match the current row, try to seek to the
667    // correct row
668    if (!matchCurrentRow) {
669      ExtendedCell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
670      boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
671      matchAfterSeek = seekSuccessful && joinedHeap.peek() != null
672        && CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
673    }
674
675    return matchCurrentRow || matchAfterSeek;
676  }
677
678  /**
679   * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
680   * filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
681   * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
682   * when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
683   * filterRow() will be skipped.
684   */
685  private boolean filterRow() throws IOException {
686    // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
687    // filterRowCells(List<Cell> kvs) so we skip that scenario here.
688    return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
689  }
690
691  private boolean filterRowKey(Cell current) throws IOException {
692    return filter != null && filter.filterRowKey(current);
693  }
694
695  /**
696   * A mocked list implementation - discards all updates.
697   */
698  private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
699
700    @Override
701    public void add(int index, Cell element) {
702      // do nothing
703    }
704
705    @Override
706    public boolean addAll(int index, Collection<? extends Cell> c) {
707      return false; // this list is never changed as a result of an update
708    }
709
710    @Override
711    public KeyValue get(int index) {
712      throw new UnsupportedOperationException();
713    }
714
715    @Override
716    public int size() {
717      return 0;
718    }
719  };
720
721  protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
722    assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
723
724    // Enable skipping row mode, which disables limits and skips tracking progress for all
725    // but block size. We keep tracking block size because skipping a row in this way
726    // might involve reading blocks along the way.
727    scannerContext.setSkippingRow(true);
728
729    Cell next;
730    while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
731      // Check for thread interrupt status in case we have been signaled from
732      // #interruptRegionOperation.
733      region.checkInterrupt();
734      this.storeHeap.next(MOCKED_LIST, scannerContext);
735    }
736
737    scannerContext.setSkippingRow(false);
738    resetFilters();
739
740    // Calling the hook in CP which allows it to do a fast forward
741    return this.region.getCoprocessorHost() == null
742      || this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
743  }
744
745  protected boolean shouldStop(Cell currentRowCell) {
746    if (currentRowCell == null) {
747      return true;
748    }
749    if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
750      return false;
751    }
752    int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
753    return c > 0 || (c == 0 && !includeStopRow);
754  }
755
756  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
757      justification = "this method is only called inside close which is synchronized")
758  private void closeInternal() {
759    if (storeHeap != null) {
760      storeHeap.close();
761      storeHeap = null;
762    }
763    if (joinedHeap != null) {
764      joinedHeap.close();
765      joinedHeap = null;
766    }
767    // no need to synchronize here.
768    scannerReadPoints.remove(this);
769    this.filterClosed = true;
770  }
771
772  @Override
773  public synchronized void close() {
774    TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
775  }
776
777  @Override
778  public synchronized boolean reseek(byte[] row) throws IOException {
779    return TraceUtil.trace(() -> {
780      if (row == null) {
781        throw new IllegalArgumentException("Row cannot be null.");
782      }
783      boolean result = false;
784      region.startRegionOperation();
785      ExtendedCell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
786      try {
787        // use request seek to make use of the lazy seek option. See HBASE-5520
788        result = this.storeHeap.requestSeek(kv, true, true);
789        if (this.joinedHeap != null) {
790          result = this.joinedHeap.requestSeek(kv, true, true) || result;
791        }
792      } finally {
793        region.closeRegionOperation();
794      }
795      return result;
796    }, () -> region.createRegionSpan("RegionScanner.reseek"));
797  }
798
799  @Override
800  public void shipped() throws IOException {
801    if (storeHeap != null) {
802      storeHeap.shipped();
803    }
804    if (joinedHeap != null) {
805      joinedHeap.shipped();
806    }
807  }
808
809  @Override
810  public void run() throws IOException {
811    // This is the RPC callback method executed. We do the close in of the scanner in this
812    // callback
813    this.close();
814  }
815}