001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.NavigableSet;
025import java.util.Optional;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.locks.ReentrantLock;
028import java.util.function.IntConsumer;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.ExtendedCell;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.KeyValueUtil;
037import org.apache.hadoop.hbase.PrivateCellUtil;
038import org.apache.hadoop.hbase.PrivateConstants;
039import org.apache.hadoop.hbase.client.IsolationLevel;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.conf.ConfigKey;
042import org.apache.hadoop.hbase.executor.ExecutorService;
043import org.apache.hadoop.hbase.filter.Filter;
044import org.apache.hadoop.hbase.ipc.RpcCall;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
047import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
048import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
049import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
050import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
051import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
059
060/**
061 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue>
062 * for a single row.
063 * <p>
064 * The implementation is not thread safe. So there will be no race between next and close. The only
065 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there
066 * is a flush.
067 */
068@InterfaceAudience.Private
069public class StoreScanner extends NonReversedNonLazyKeyValueScanner
070  implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
071  private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);
072  // In unit tests, the store could be null
073  protected final HStore store;
074  private final CellComparator comparator;
075  private ScanQueryMatcher matcher;
076  protected KeyValueHeap heap;
077  private boolean cacheBlocks;
078
079  private long countPerRow = 0;
080  private int storeLimit = -1;
081  private int storeOffset = 0;
082
083  // Used to indicate that the scanner has closed (see HBASE-1107)
084  private volatile boolean closing = false;
085  private final boolean get;
086  private final boolean explicitColumnQuery;
087  private final boolean useRowColBloom;
088  /**
089   * A flag that enables StoreFileScanner parallel-seeking
090   */
091  private boolean parallelSeekEnabled = false;
092  private ExecutorService executor;
093  private final Scan scan;
094  private final long oldestUnexpiredTS;
095  private final long now;
096  private final int minVersions;
097  private final long maxRowSize;
098  private final long cellsPerHeartbeatCheck;
099  long memstoreOnlyReads;
100  long mixedReads;
101
102  // 1) Collects all the KVHeap that are eagerly getting closed during the
103  // course of a scan
104  // 2) Collects the unused memstore scanners. If we close the memstore scanners
105  // before sending data to client, the chunk may be reclaimed by other
106  // updates and the data will be corrupt.
107  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
108
109  /**
110   * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via
111   * seeking to next row/column. TODO: estimate them?
112   */
113  private long kvsScanned = 0;
114  private ExtendedCell prevCell = null;
115
116  private final long preadMaxBytes;
117  private long bytesRead;
118
119  /** We don't ever expect to change this, the constant is just for clarity. */
120  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
121  public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
122    "hbase.storescanner.parallel.seek.enable";
123
124  /** Used during unit testing to ensure that lazy seek does save seek ops */
125  private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
126
127  /**
128   * The number of cells scanned in between timeout checks. Specifying a larger value means that
129   * timeout checks will occur less frequently. Specifying a small value will lead to more frequent
130   * timeout checks.
131   */
132  public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
133    ConfigKey.LONG("hbase.cells.scanned.per.heartbeat.check");
134
135  /**
136   * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.
137   */
138  public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
139
140  /**
141   * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
142   * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
143   * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT,
144   * we will open scanner with stream mode itself.
145   */
146  public static final String STORESCANNER_PREAD_MAX_BYTES =
147    ConfigKey.LONG("hbase.storescanner.pread.max.bytes");
148
149  private final Scan.ReadType readType;
150
151  // A flag whether use pread for scan
152  // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
153  private boolean scanUsePread;
154  // Indicates whether there was flush during the course of the scan
155  private volatile boolean flushed = false;
156  // generally we get one file from a flush
157  private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1);
158  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
159  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
160  // The current list of scanners
161  final List<KeyValueScanner> currentScanners = new ArrayList<>();
162  // flush update lock
163  private final ReentrantLock flushLock = new ReentrantLock();
164  // lock for closing.
165  private final ReentrantLock closeLock = new ReentrantLock();
166
167  protected final long readPt;
168  private boolean topChanged = false;
169
170  /** An internal constructor. */
171  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
172    boolean cacheBlocks, ScanType scanType) {
173    this.readPt = readPt;
174    this.store = store;
175    this.cacheBlocks = cacheBlocks;
176    this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
177    get = scan.isGetScan();
178    explicitColumnQuery = numColumns > 0;
179    this.scan = scan;
180    this.now = EnvironmentEdgeManager.currentTime();
181    this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
182    this.minVersions = scanInfo.getMinVersions();
183
184    // We look up row-column Bloom filters for multi-column queries as part of
185    // the seek operation. However, we also look the row-column Bloom filter
186    // for multi-row (non-"get") scans because this is not done in
187    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
188    this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null
189      || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);
190    this.maxRowSize = scanInfo.getTableMaxRowSize();
191    this.preadMaxBytes = scanInfo.getPreadMaxBytes();
192    if (get) {
193      this.readType = Scan.ReadType.PREAD;
194      this.scanUsePread = true;
195    } else if (scanType != ScanType.USER_SCAN) {
196      // For compaction scanners never use Pread as already we have stream based scanners on the
197      // store files to be compacted
198      this.readType = Scan.ReadType.STREAM;
199      this.scanUsePread = false;
200    } else {
201      if (scan.getReadType() == Scan.ReadType.DEFAULT) {
202        if (scanInfo.isUsePread()) {
203          this.readType = Scan.ReadType.PREAD;
204        } else if (this.preadMaxBytes < 0) {
205          this.readType = Scan.ReadType.STREAM;
206        } else {
207          this.readType = Scan.ReadType.DEFAULT;
208        }
209      } else {
210        this.readType = scan.getReadType();
211      }
212      // Always start with pread unless user specific stream. Will change to stream later if
213      // readType is default if the scan keeps running for a long time.
214      this.scanUsePread = this.readType != Scan.ReadType.STREAM;
215    }
216    this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
217    // Parallel seeking is on if the config allows and more there is more than one store file.
218    if (store != null && store.getStorefilesCount() > 1) {
219      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
220      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
221        this.parallelSeekEnabled = true;
222        this.executor = rsService.getExecutorService();
223      }
224    }
225  }
226
227  private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
228    this.currentScanners.addAll(scanners);
229  }
230
231  private static boolean isOnlyLatestVersionScan(Scan scan) {
232    // No need to check for Scan#getMaxVersions because live version files generated by store file
233    // writer retains max versions specified in ColumnFamilyDescriptor for the given CF
234    return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;
235  }
236
237  /**
238   * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a
239   * compaction.
240   * @param store   who we scan
241   * @param scan    the spec
242   * @param columns which columns we are scanning
243   */
244  public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
245    long readPt) throws IOException {
246    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),
247      ScanType.USER_SCAN);
248    if (columns != null && scan.isRaw()) {
249      throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
250    }
251    matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
252      store.getCoprocessorHost());
253
254    store.addChangedReaderObserver(this);
255
256    List<KeyValueScanner> scanners = null;
257    try {
258      // Pass columns to try to filter out unnecessary StoreFiles.
259      scanners = selectScannersFrom(store,
260        store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
261          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt,
262          isOnlyLatestVersionScan(scan)));
263
264      // Seek all scanners to the start of the Row (or if the exact matching row
265      // key does not exist, then to the start of the next matching Row).
266      // Always check bloom filter to optimize the top row seek for delete
267      // family marker.
268      seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
269        parallelSeekEnabled);
270
271      // set storeLimit
272      this.storeLimit = scan.getMaxResultsPerColumnFamily();
273
274      // set rowOffset
275      this.storeOffset = scan.getRowOffsetPerColumnFamily();
276      addCurrentScanners(scanners);
277      // Combine all seeked scanners with a heap
278      resetKVHeap(scanners, comparator);
279    } catch (IOException e) {
280      clearAndClose(scanners);
281      // remove us from the HStore#changedReaderObservers here or we'll have no chance to
282      // and might cause memory leak
283      store.deleteChangedReaderObserver(this);
284      throw e;
285    }
286  }
287
288  // a dummy scan instance for compaction.
289  private static final Scan SCAN_FOR_COMPACTION = new Scan();
290
291  /**
292   * Used for store file compaction and memstore compaction.
293   * <p>
294   * Opens a scanner across specified StoreFiles/MemStoreSegments.
295   * @param store             who we scan
296   * @param scanners          ancillary scanners
297   * @param smallestReadPoint the readPoint that we should use for tracking versions
298   */
299  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
300    ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
301    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
302  }
303
304  /**
305   * Used for compactions that drop deletes from a limited range of rows.
306   * <p>
307   * Opens a scanner across specified StoreFiles.
308   * @param store              who we scan
309   * @param scanners           ancillary scanners
310   * @param smallestReadPoint  the readPoint that we should use for tracking versions
311   * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
312   * @param dropDeletesToRow   The exclusive right bound of the range; can be EMPTY_END_ROW.
313   */
314  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
315    long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow)
316    throws IOException {
317    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
318      earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
319  }
320
321  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
322    ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
323    byte[] dropDeletesToRow) throws IOException {
324    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
325      store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
326    assert scanType != ScanType.USER_SCAN;
327    matcher =
328      CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
329        oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
330
331    // Filter the list of scanners using Bloom filters, time range, TTL, etc.
332    scanners = selectScannersFrom(store, scanners);
333
334    // Seek all scanners to the initial key
335    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
336    addCurrentScanners(scanners);
337    // Combine all seeked scanners with a heap
338    resetKVHeap(scanners, comparator);
339  }
340
341  private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
342    throws IOException {
343    // Seek all scanners to the initial key
344    seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
345    addCurrentScanners(scanners);
346    resetKVHeap(scanners, comparator);
347  }
348
349  // For mob compaction only as we do not have a Store instance when doing mob compaction.
350  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
351    List<? extends KeyValueScanner> scanners) throws IOException {
352    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
353    assert scanType != ScanType.USER_SCAN;
354    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
355      oldestUnexpiredTS, now, null, null, null);
356    seekAllScanner(scanInfo, scanners);
357  }
358
359  // Used to instantiate a scanner for user scan in test
360  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
361    List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException {
362    // 0 is passed as readpoint because the test bypasses Store
363    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
364      scanType);
365    if (scanType == ScanType.USER_SCAN) {
366      this.matcher =
367        UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
368    } else {
369      this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
370        PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
371    }
372    seekAllScanner(scanInfo, scanners);
373  }
374
375  // Used to instantiate a scanner for user scan in test
376  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
377    List<? extends KeyValueScanner> scanners) throws IOException {
378    // 0 is passed as readpoint because the test bypasses Store
379    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
380      ScanType.USER_SCAN);
381    this.matcher =
382      UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
383    seekAllScanner(scanInfo, scanners);
384  }
385
386  // Used to instantiate a scanner for compaction in test
387  StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,
388    List<? extends KeyValueScanner> scanners) throws IOException {
389    // 0 is passed as readpoint because the test bypasses Store
390    this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION,
391      scanInfo, 0, 0L, false, scanType);
392    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
393      PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
394    seekAllScanner(scanInfo, scanners);
395  }
396
397  boolean isScanUsePread() {
398    return this.scanUsePread;
399  }
400
401  /**
402   * Seek the specified scanners with the given key
403   * @param isLazy         true if using lazy seek
404   * @param isParallelSeek true if using parallel seek
405   */
406  protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey,
407    boolean isLazy, boolean isParallelSeek) throws IOException {
408    // Seek all scanners to the start of the Row (or if the exact matching row
409    // key does not exist, then to the start of the next matching Row).
410    // Always check bloom filter to optimize the top row seek for delete
411    // family marker.
412    if (isLazy) {
413      for (KeyValueScanner scanner : scanners) {
414        scanner.requestSeek(seekKey, false, true);
415      }
416    } else {
417      if (!isParallelSeek) {
418        long totalScannersSoughtBytes = 0;
419        for (KeyValueScanner scanner : scanners) {
420          if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) {
421            throw new RowTooBigException(
422              "Max row size allowed: " + maxRowSize + ", but row is bigger than that");
423          }
424          scanner.seek(seekKey);
425          Cell c = scanner.peek();
426          if (c != null) {
427            totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);
428          }
429        }
430      } else {
431        parallelSeek(scanners, seekKey);
432      }
433    }
434  }
435
436  protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
437    throws IOException {
438    // Combine all seeked scanners with a heap
439    heap = newKVHeap(scanners, comparator);
440  }
441
442  protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
443    CellComparator comparator) throws IOException {
444    return new KeyValueHeap(scanners, comparator);
445  }
446
447  /**
448   * Filters the given list of scanners using Bloom filter, time range, and TTL.
449   * <p>
450   * Will be overridden by testcase so declared as protected.
451   */
452  protected List<KeyValueScanner> selectScannersFrom(HStore store,
453    List<? extends KeyValueScanner> allScanners) {
454    boolean memOnly;
455    boolean filesOnly;
456    if (scan instanceof InternalScan) {
457      InternalScan iscan = (InternalScan) scan;
458      memOnly = iscan.isCheckOnlyMemStore();
459      filesOnly = iscan.isCheckOnlyStoreFiles();
460    } else {
461      memOnly = false;
462      filesOnly = false;
463    }
464
465    List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size());
466
467    // We can only exclude store files based on TTL if minVersions is set to 0.
468    // Otherwise, we might have to return KVs that have technically expired.
469    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;
470
471    // include only those scan files which pass all filters
472    for (KeyValueScanner kvs : allScanners) {
473      boolean isFile = kvs.isFileScanner();
474      if ((!isFile && filesOnly) || (isFile && memOnly)) {
475        kvs.close();
476        continue;
477      }
478
479      if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
480        scanners.add(kvs);
481      } else {
482        kvs.close();
483      }
484    }
485    return scanners;
486  }
487
488  @Override
489  public ExtendedCell peek() {
490    return heap != null ? heap.peek() : null;
491  }
492
493  @Override
494  public KeyValue next() {
495    // throw runtime exception perhaps?
496    throw new RuntimeException("Never call StoreScanner.next()");
497  }
498
499  @Override
500  public void close() {
501    close(true);
502  }
503
504  private void close(boolean withDelayedScannersClose) {
505    closeLock.lock();
506    // If the closeLock is acquired then any subsequent updateReaders()
507    // call is ignored.
508    try {
509      if (this.closing) {
510        return;
511      }
512      if (withDelayedScannersClose) {
513        this.closing = true;
514      }
515      // For mob compaction, we do not have a store.
516      if (this.store != null) {
517        this.store.deleteChangedReaderObserver(this);
518      }
519      if (withDelayedScannersClose) {
520        clearAndClose(scannersForDelayedClose);
521        clearAndClose(memStoreScannersAfterFlush);
522        clearAndClose(flushedstoreFileScanners);
523        if (this.heap != null) {
524          this.heap.close();
525          this.currentScanners.clear();
526          this.heap = null; // CLOSED!
527        }
528      } else {
529        if (this.heap != null) {
530          this.scannersForDelayedClose.add(this.heap);
531          this.currentScanners.clear();
532          this.heap = null;
533        }
534      }
535    } finally {
536      closeLock.unlock();
537    }
538  }
539
540  @Override
541  public boolean seek(ExtendedCell key) throws IOException {
542    if (checkFlushed()) {
543      reopenAfterFlush();
544    }
545    return this.heap.seek(key);
546  }
547
548  /**
549   * Get the next row of values from this Store.
550   * @return true if there are more rows, false if scanner is done
551   */
552  @Override
553  public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext)
554    throws IOException {
555    if (scannerContext == null) {
556      throw new IllegalArgumentException("Scanner context cannot be null");
557    }
558    if (checkFlushed() && reopenAfterFlush()) {
559      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
560    }
561
562    // if the heap was left null, then the scanners had previously run out anyways, close and
563    // return.
564    if (this.heap == null) {
565      // By this time partial close should happened because already heap is null
566      close(false);// Do all cleanup except heap.close()
567      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
568    }
569
570    ExtendedCell cell = this.heap.peek();
571    if (cell == null) {
572      close(false);// Do all cleanup except heap.close()
573      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
574    }
575
576    // only call setRow if the row changes; avoids confusing the query matcher
577    // if scanning intra-row
578
579    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
580    // rows. Else it is possible we are still traversing the same row so we must perform the row
581    // comparison.
582    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
583      this.countPerRow = 0;
584      matcher.setToNewRow(cell);
585    }
586
587    // Clear progress away unless invoker has indicated it should be kept.
588    if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
589      scannerContext.clearProgress();
590    }
591
592    Optional<RpcCall> rpcCall =
593      matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
594    // re-useable closure to avoid allocations
595    IntConsumer recordBlockSize = blockSize -> {
596      if (rpcCall.isPresent()) {
597        rpcCall.get().incrementBlockBytesScanned(blockSize);
598      }
599      scannerContext.incrementBlockProgress(blockSize);
600    };
601
602    int count = 0;
603    long totalBytesRead = 0;
604    // track the cells for metrics only if it is a user read request.
605    boolean onlyFromMemstore = matcher.isUserScan();
606    try {
607      LOOP: do {
608        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
609        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
610        // in
611        // the shipped method below.
612        if (
613          kvsScanned % cellsPerHeartbeatCheck == 0
614            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
615        ) {
616          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
617            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
618          }
619        }
620        // Do object compare - we set prevKV from the same heap.
621        if (prevCell != cell) {
622          ++kvsScanned;
623        }
624        checkScanOrder(prevCell, cell, comparator);
625        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
626        bytesRead += cellSize;
627        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
628          // return immediately if we want to switch from pread to stream. We need this because we
629          // can
630          // only switch in the shipped method, if user use a filter to filter out everything and
631          // rpc
632          // timeout is very large then the shipped method will never be called until the whole scan
633          // is finished, but at that time we have already scan all the data...
634          // See HBASE-20457 for more details.
635          // And there is still a scenario that can not be handled. If we have a very large row,
636          // which
637          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
638          // here, we still need to scan all the qualifiers before returning...
639          scannerContext.returnImmediately();
640        }
641
642        heap.recordBlockSize(recordBlockSize);
643
644        prevCell = cell;
645        scannerContext.setLastPeekedCell(cell);
646        topChanged = false;
647        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
648        switch (qcode) {
649          case INCLUDE:
650          case INCLUDE_AND_SEEK_NEXT_ROW:
651          case INCLUDE_AND_SEEK_NEXT_COL:
652            Filter f = matcher.getFilter();
653            if (f != null) {
654              Cell transformedCell = f.transformCell(cell);
655              // fast path, most filters just return the same cell instance
656              if (transformedCell != cell) {
657                if (transformedCell instanceof ExtendedCell) {
658                  cell = (ExtendedCell) transformedCell;
659                } else {
660                  throw new DoNotRetryIOException("Incorrect filter implementation, "
661                    + "the Cell returned by transformCell is not an ExtendedCell. Filter class: "
662                    + f.getClass().getName());
663                }
664              }
665            }
666            this.countPerRow++;
667
668            // add to results only if we have skipped #storeOffset kvs
669            // also update metric accordingly
670            if (this.countPerRow > storeOffset) {
671              outResult.add(cell);
672
673              // Update local tracking information
674              count++;
675              totalBytesRead += cellSize;
676
677              /**
678               * Increment the metric if all the cells are from memstore. If not we will account it
679               * for mixed reads
680               */
681              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
682              // Update the progress of the scanner context
683              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
684              scannerContext.incrementBatchProgress(1);
685
686              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
687                String message = "Max row size allowed: " + maxRowSize
688                  + ", but the row is bigger than that, the row info: "
689                  + CellUtil.toString(cell, false) + ", already have process row cells = "
690                  + outResult.size() + ", it belong to region = "
691                  + store.getHRegion().getRegionInfo().getRegionNameAsString();
692                LOG.warn(message);
693                throw new RowTooBigException(message);
694              }
695
696              if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
697                // do what SEEK_NEXT_ROW does.
698                if (!matcher.moreRowsMayExistAfter(cell)) {
699                  close(false);// Do all cleanup except heap.close()
700                  return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
701                }
702                matcher.clearCurrentRow();
703                seekToNextRow(cell);
704                break LOOP;
705              }
706            }
707
708            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
709              if (!matcher.moreRowsMayExistAfter(cell)) {
710                close(false);// Do all cleanup except heap.close()
711                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
712              }
713              matcher.clearCurrentRow();
714              seekOrSkipToNextRow(cell);
715            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
716              seekOrSkipToNextColumn(cell);
717            } else {
718              this.heap.next();
719            }
720
721            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
722              break LOOP;
723            }
724            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
725              break LOOP;
726            }
727            continue;
728
729          case DONE:
730            // Optimization for Gets! If DONE, no more to get on this row, early exit!
731            if (get) {
732              // Then no more to this row... exit.
733              close(false);// Do all cleanup except heap.close()
734              // update metric
735              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
736            }
737            matcher.clearCurrentRow();
738            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
739
740          case DONE_SCAN:
741            close(false);// Do all cleanup except heap.close()
742            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
743
744          case SEEK_NEXT_ROW:
745            // This is just a relatively simple end of scan fix, to short-cut end
746            // us if there is an endKey in the scan.
747            if (!matcher.moreRowsMayExistAfter(cell)) {
748              close(false);// Do all cleanup except heap.close()
749              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
750            }
751            matcher.clearCurrentRow();
752            seekOrSkipToNextRow(cell);
753            NextState stateAfterSeekNextRow = needToReturn(outResult);
754            if (stateAfterSeekNextRow != null) {
755              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
756            }
757            break;
758
759          case SEEK_NEXT_COL:
760            seekOrSkipToNextColumn(cell);
761            NextState stateAfterSeekNextColumn = needToReturn(outResult);
762            if (stateAfterSeekNextColumn != null) {
763              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
764            }
765            break;
766
767          case SKIP:
768            this.heap.next();
769            break;
770
771          case SEEK_NEXT_USING_HINT:
772            ExtendedCell nextKV = matcher.getNextKeyHint(cell);
773            if (nextKV != null) {
774              int difference = comparator.compare(nextKV, cell);
775              if (
776                ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
777              ) {
778                seekAsDirection(nextKV);
779                NextState stateAfterSeekByHint = needToReturn(outResult);
780                if (stateAfterSeekByHint != null) {
781                  return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
782                }
783                break;
784              }
785            }
786            heap.next();
787            break;
788
789          default:
790            throw new RuntimeException("UNEXPECTED");
791        }
792
793        // One last chance to break due to size limit. The INCLUDE* cases above already check
794        // limit and continue. For the various filtered cases, we need to check because block
795        // size limit may have been exceeded even if we don't add cells to result list.
796        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
797          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
798        }
799      } while ((cell = this.heap.peek()) != null);
800
801      if (count > 0) {
802        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
803      }
804
805      // No more keys
806      close(false);// Do all cleanup except heap.close()
807      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
808    } finally {
809      // increment only if we have some result
810      if (count > 0 && matcher.isUserScan()) {
811        // if true increment memstore metrics, if not the mixed one
812        updateMetricsStore(onlyFromMemstore);
813      }
814    }
815  }
816
817  private void updateMetricsStore(boolean memstoreRead) {
818    if (store != null) {
819      store.updateMetricsStore(memstoreRead);
820    } else {
821      // for testing.
822      if (memstoreRead) {
823        memstoreOnlyReads++;
824      } else {
825        mixedReads++;
826      }
827    }
828  }
829
830  /**
831   * If the top cell won't be flushed into disk, the new top cell may be changed after
832   * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the
833   * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell
834   * is changed, we should return the current cells. Otherwise, we may return the cells across
835   * different rows.
836   * @param outResult the cells which are visible for user scan
837   * @return null is the top cell doesn't change. Otherwise, the NextState to return
838   */
839  private NextState needToReturn(List<? super ExtendedCell> outResult) {
840    if (!outResult.isEmpty() && topChanged) {
841      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
842    }
843    return null;
844  }
845
846  private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException {
847    // If it is a Get Scan, then we know that we are done with this row; there are no more
848    // rows beyond the current one: don't try to optimize.
849    if (!get) {
850      if (trySkipToNextRow(cell)) {
851        return;
852      }
853    }
854    seekToNextRow(cell);
855  }
856
857  private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException {
858    if (!trySkipToNextColumn(cell)) {
859      seekAsDirection(matcher.getKeyForNextColumn(cell));
860    }
861  }
862
863  /**
864   * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).
865   * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an
866   * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to
867   * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the
868   * current, loaded block). It does this by looking at the next indexed key of the current HFile.
869   * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible
870   * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work
871   * with the current Cell but compare as though it were a seek key; see down in
872   * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK,
873   * otherwise we just SKIP to the next requested cell.
874   * <p>
875   * Other notes:
876   * <ul>
877   * <li>Rows can straddle block boundaries</li>
878   * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a
879   * different block than column C1 at T2)</li>
880   * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few
881   * SKIPs...</li>
882   * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells,
883   * especially if we know we need to go to the next block.</li>
884   * </ul>
885   * <p>
886   * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll
887   * likely end up seeking to the next block (or past the next block) to get our next column.
888   * Example:
889   *
890   * <pre>
891   * |    BLOCK 1              |     BLOCK 2                   |
892   * |  r1/c1, r1/c2, r1/c3    |    r1/c4, r1/c5, r2/c1        |
893   *                                   ^         ^
894   *                                   |         |
895   *                           Next Index Key   SEEK_NEXT_ROW (before r2/c1)
896   *
897   *
898   * |    BLOCK 1                       |     BLOCK 2                      |
899   * |  r1/c1/t5, r1/c1/t4, r1/c1/t3    |    r1/c1/t2, r1/c1/T1, r1/c2/T3  |
900   *                                            ^              ^
901   *                                            |              |
902   *                                    Next Index Key        SEEK_NEXT_COL
903   * </pre>
904   *
905   * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4
906   * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only
907   * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at
908   * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios
909   * where the SEEK will not land us in the next block, it is very likely better to issues a series
910   * of SKIPs.
911   * @param cell current cell
912   * @return true means skip to next row, false means not
913   */
914  protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException {
915    ExtendedCell nextCell = null;
916    // used to guard against a changed next indexed key by doing a identity comparison
917    // when the identity changes we need to compare the bytes again
918    ExtendedCell previousIndexedKey = null;
919    do {
920      ExtendedCell nextIndexedKey = getNextIndexedKey();
921      if (
922        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
923          && (nextIndexedKey == previousIndexedKey
924            || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)
925      ) {
926        this.heap.next();
927        ++kvsScanned;
928        previousIndexedKey = nextIndexedKey;
929      } else {
930        return false;
931      }
932    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell));
933    return true;
934  }
935
936  /**
937   * See {@link #trySkipToNextRow(ExtendedCell)}
938   * @param cell current cell
939   * @return true means skip to next column, false means not
940   */
941  protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException {
942    ExtendedCell nextCell = null;
943    // used to guard against a changed next indexed key by doing a identity comparison
944    // when the identity changes we need to compare the bytes again
945    ExtendedCell previousIndexedKey = null;
946    do {
947      ExtendedCell nextIndexedKey = getNextIndexedKey();
948      if (
949        nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
950          && (nextIndexedKey == previousIndexedKey
951            || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)
952      ) {
953        this.heap.next();
954        ++kvsScanned;
955        previousIndexedKey = nextIndexedKey;
956      } else {
957        return false;
958      }
959    } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell));
960    // We need this check because it may happen that the new scanner that we get
961    // during heap.next() is requiring reseek due of fake KV previously generated for
962    // ROWCOL bloom filter optimization. See HBASE-19863 for more details
963    if (
964      useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP
965    ) {
966      return false;
967    }
968    return true;
969  }
970
971  @Override
972  public long getReadPoint() {
973    return this.readPt;
974  }
975
976  private static void clearAndClose(List<KeyValueScanner> scanners) {
977    if (scanners == null) {
978      return;
979    }
980    for (KeyValueScanner s : scanners) {
981      s.close();
982    }
983    scanners.clear();
984  }
985
986  // Implementation of ChangedReadersObserver
987  @Override
988  public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners)
989    throws IOException {
990    if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
991      return;
992    }
993    boolean updateReaders = false;
994    flushLock.lock();
995    try {
996      if (!closeLock.tryLock()) {
997        // The reason for doing this is that when the current store scanner does not retrieve
998        // any new cells, then the scanner is considered to be done. The heap of this scanner
999        // is not closed till the shipped() call is completed. Hence in that case if at all
1000        // the partial close (close (false)) has been called before updateReaders(), there is no
1001        // need for the updateReaders() to happen.
1002        LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
1003        // no lock acquired.
1004        clearAndClose(memStoreScanners);
1005        return;
1006      }
1007      // lock acquired
1008      updateReaders = true;
1009      if (this.closing) {
1010        LOG.debug("StoreScanner already closing. There is no need to updateReaders");
1011        clearAndClose(memStoreScanners);
1012        return;
1013      }
1014      flushed = true;
1015      final boolean isCompaction = false;
1016      boolean usePread = get || scanUsePread;
1017      // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner
1018      // calls next(). So its better we create scanners here rather than next() call. Ensure
1019      // these scanners are properly closed() whether or not the scan is completed successfully
1020      // Eagerly creating scanners so that we have the ref counting ticking on the newly created
1021      // store files. In case of stream scanners this eager creation does not induce performance
1022      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
1023      List<KeyValueScanner> scanners =
1024        store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher,
1025          scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan));
1026      flushedstoreFileScanners.addAll(scanners);
1027      if (!CollectionUtils.isEmpty(memStoreScanners)) {
1028        clearAndClose(memStoreScannersAfterFlush);
1029        memStoreScannersAfterFlush.addAll(memStoreScanners);
1030      }
1031    } finally {
1032      flushLock.unlock();
1033      if (updateReaders) {
1034        closeLock.unlock();
1035      }
1036    }
1037    // Let the next() call handle re-creating and seeking
1038  }
1039
1040  /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */
1041  protected final boolean reopenAfterFlush() throws IOException {
1042    // here we can make sure that we have a Store instance so no null check on store.
1043    ExtendedCell lastTop = heap.peek();
1044    // When we have the scan object, should we not pass it to getScanners() to get a limited set of
1045    // scanners? We did so in the constructor and we could have done it now by storing the scan
1046    // object from the constructor
1047    List<KeyValueScanner> scanners;
1048    flushLock.lock();
1049    try {
1050      List<KeyValueScanner> allScanners =
1051        new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());
1052      allScanners.addAll(flushedstoreFileScanners);
1053      allScanners.addAll(memStoreScannersAfterFlush);
1054      scanners = selectScannersFrom(store, allScanners);
1055      // Clear the current set of flushed store files scanners so that they don't get added again
1056      flushedstoreFileScanners.clear();
1057      memStoreScannersAfterFlush.clear();
1058    } finally {
1059      flushLock.unlock();
1060    }
1061
1062    // Seek the new scanners to the last key
1063    seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1064    // remove the older memstore scanner
1065    for (int i = currentScanners.size() - 1; i >= 0; i--) {
1066      if (!currentScanners.get(i).isFileScanner()) {
1067        scannersForDelayedClose.add(currentScanners.remove(i));
1068      } else {
1069        // we add the memstore scanner to the end of currentScanners
1070        break;
1071      }
1072    }
1073    // add the newly created scanners on the flushed files and the current active memstore scanner
1074    addCurrentScanners(scanners);
1075    // Combine all seeked scanners with a heap
1076    resetKVHeap(this.currentScanners, store.getComparator());
1077    resetQueryMatcher(lastTop);
1078    if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
1079      LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString()
1080        + ",and after = " + heap.peek());
1081      topChanged = true;
1082    } else {
1083      topChanged = false;
1084    }
1085    return topChanged;
1086  }
1087
1088  private void resetQueryMatcher(ExtendedCell lastTopKey) {
1089    // Reset the state of the Query Matcher and set to top row.
1090    // Only reset and call setRow if the row changes; avoids confusing the
1091    // query matcher if scanning intra-row.
1092    ExtendedCell cell = heap.peek();
1093    if (cell == null) {
1094      cell = lastTopKey;
1095    }
1096    if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {
1097      this.countPerRow = 0;
1098      // The setToNewRow will call reset internally
1099      matcher.setToNewRow(cell);
1100    }
1101  }
1102
1103  /**
1104   * Check whether scan as expected order
1105   */
1106  protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator)
1107    throws IOException {
1108    // Check that the heap gives us KVs in an increasing order.
1109    assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0
1110      : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store;
1111  }
1112
1113  protected boolean seekToNextRow(ExtendedCell c) throws IOException {
1114    return reseek(PrivateCellUtil.createLastOnRow(c));
1115  }
1116
1117  /**
1118   * Do a reseek in a normal StoreScanner(scan forward)
1119   * @return true if scanner has values left, false if end of scanner
1120   */
1121  protected boolean seekAsDirection(ExtendedCell kv) throws IOException {
1122    return reseek(kv);
1123  }
1124
1125  @Override
1126  public boolean reseek(ExtendedCell kv) throws IOException {
1127    if (checkFlushed()) {
1128      reopenAfterFlush();
1129    }
1130    if (explicitColumnQuery && lazySeekEnabledGlobally) {
1131      return heap.requestSeek(kv, true, useRowColBloom);
1132    }
1133    return heap.reseek(kv);
1134  }
1135
1136  void trySwitchToStreamRead() {
1137    if (
1138      readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
1139        || bytesRead < preadMaxBytes
1140    ) {
1141      return;
1142    }
1143    LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,
1144      this.store.getColumnFamilyName());
1145    scanUsePread = false;
1146    ExtendedCell lastTop = heap.peek();
1147    List<KeyValueScanner> memstoreScanners = new ArrayList<>();
1148    List<KeyValueScanner> scannersToClose = new ArrayList<>();
1149    for (KeyValueScanner kvs : currentScanners) {
1150      if (!kvs.isFileScanner()) {
1151        // collect memstorescanners here
1152        memstoreScanners.add(kvs);
1153      } else {
1154        scannersToClose.add(kvs);
1155      }
1156    }
1157    List<KeyValueScanner> fileScanners = null;
1158    List<KeyValueScanner> newCurrentScanners;
1159    KeyValueHeap newHeap;
1160    try {
1161      // We must have a store instance here so no null check
1162      // recreate the scanners on the current file scanners
1163      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher,
1164        scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
1165        readPt, false);
1166      if (fileScanners == null) {
1167        return;
1168      }
1169      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1170      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
1171      newCurrentScanners.addAll(fileScanners);
1172      newCurrentScanners.addAll(memstoreScanners);
1173      newHeap = newKVHeap(newCurrentScanners, comparator);
1174    } catch (Exception e) {
1175      LOG.warn("failed to switch to stream read", e);
1176      if (fileScanners != null) {
1177        fileScanners.forEach(KeyValueScanner::close);
1178      }
1179      return;
1180    }
1181    currentScanners.clear();
1182    addCurrentScanners(newCurrentScanners);
1183    this.heap = newHeap;
1184    resetQueryMatcher(lastTop);
1185    scannersToClose.forEach(KeyValueScanner::close);
1186  }
1187
1188  protected final boolean checkFlushed() {
1189    // check the var without any lock. Suppose even if we see the old
1190    // value here still it is ok to continue because we will not be resetting
1191    // the heap but will continue with the referenced memstore's snapshot. For compactions
1192    // any way we don't need the updateReaders at all to happen as we still continue with
1193    // the older files
1194    if (flushed) {
1195      // If there is a flush and the current scan is notified on the flush ensure that the
1196      // scan's heap gets reset and we do a seek on the newly flushed file.
1197      if (this.closing) {
1198        return false;
1199      }
1200      // reset the flag
1201      flushed = false;
1202      return true;
1203    }
1204    return false;
1205  }
1206
1207  /**
1208   * Seek storefiles in parallel to optimize IO latency as much as possible
1209   * @param scanners the list {@link KeyValueScanner}s to be read from
1210   * @param kv       the KeyValue on which the operation is being requested
1211   */
1212  private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv)
1213    throws IOException {
1214    if (scanners.isEmpty()) return;
1215    int storeFileScannerCount = scanners.size();
1216    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
1217    List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount);
1218    for (KeyValueScanner scanner : scanners) {
1219      if (scanner instanceof StoreFileScanner) {
1220        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch);
1221        executor.submit(seekHandler);
1222        handlers.add(seekHandler);
1223      } else {
1224        scanner.seek(kv);
1225        latch.countDown();
1226      }
1227    }
1228
1229    try {
1230      latch.await();
1231    } catch (InterruptedException ie) {
1232      throw (InterruptedIOException) new InterruptedIOException().initCause(ie);
1233    }
1234
1235    for (ParallelSeekHandler handler : handlers) {
1236      if (handler.getErr() != null) {
1237        throw new IOException(handler.getErr());
1238      }
1239    }
1240  }
1241
1242  /**
1243   * Used in testing.
1244   * @return all scanners in no particular order
1245   */
1246  List<KeyValueScanner> getAllScannersForTesting() {
1247    List<KeyValueScanner> allScanners = new ArrayList<>();
1248    KeyValueScanner current = heap.getCurrentForTesting();
1249    if (current != null) allScanners.add(current);
1250    for (KeyValueScanner scanner : heap.getHeap())
1251      allScanners.add(scanner);
1252    return allScanners;
1253  }
1254
1255  static void enableLazySeekGlobally(boolean enable) {
1256    lazySeekEnabledGlobally = enable;
1257  }
1258
1259  /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */
1260  public long getEstimatedNumberOfKvsScanned() {
1261    return this.kvsScanned;
1262  }
1263
1264  @Override
1265  public ExtendedCell getNextIndexedKey() {
1266    return this.heap.getNextIndexedKey();
1267  }
1268
1269  @Override
1270  public void shipped() throws IOException {
1271    if (prevCell != null) {
1272      // Do the copy here so that in case the prevCell ref is pointing to the previous
1273      // blocks we can safely release those blocks.
1274      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks
1275      // fetched from HDFS. Copying this would ensure that we let go the references to these
1276      // blocks so that they can be GCed safely(in case of bucket cache)
1277      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
1278    }
1279    matcher.beforeShipped();
1280    // There wont be further fetch of Cells from these scanners. Just close.
1281    clearAndClose(scannersForDelayedClose);
1282    if (this.heap != null) {
1283      this.heap.shipped();
1284      // When switching from pread to stream, we will open a new scanner for each store file, but
1285      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client
1286      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others
1287      // before we serialize and send it back to client. The HFileBlocks will be released in shipped
1288      // method, so we here will also open new scanners and close old scanners in shipped method.
1289      // See HBASE-18055 for more details.
1290      trySwitchToStreamRead();
1291    }
1292  }
1293}