001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.NavigableSet; 025import java.util.Optional; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.locks.ReentrantLock; 028import java.util.function.IntConsumer; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellComparator; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.ExtendedCell; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.KeyValue; 036import org.apache.hadoop.hbase.KeyValueUtil; 037import org.apache.hadoop.hbase.PrivateCellUtil; 038import org.apache.hadoop.hbase.PrivateConstants; 039import org.apache.hadoop.hbase.client.IsolationLevel; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.conf.ConfigKey; 042import org.apache.hadoop.hbase.executor.ExecutorService; 043import org.apache.hadoop.hbase.filter.Filter; 044import org.apache.hadoop.hbase.ipc.RpcCall; 045import org.apache.hadoop.hbase.ipc.RpcServer; 046import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 047import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; 048import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; 049import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; 050import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; 051import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 058import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 059 060/** 061 * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> 062 * for a single row. 063 * <p> 064 * The implementation is not thread safe. So there will be no race between next and close. The only 065 * exception is updateReaders, it will be called in the memstore flush thread to indicate that there 066 * is a flush. 067 */ 068@InterfaceAudience.Private 069public class StoreScanner extends NonReversedNonLazyKeyValueScanner 070 implements KeyValueScanner, InternalScanner, ChangedReadersObserver { 071 private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); 072 // In unit tests, the store could be null 073 protected final HStore store; 074 private final CellComparator comparator; 075 private ScanQueryMatcher matcher; 076 protected KeyValueHeap heap; 077 private boolean cacheBlocks; 078 079 private long countPerRow = 0; 080 private int storeLimit = -1; 081 private int storeOffset = 0; 082 083 // Used to indicate that the scanner has closed (see HBASE-1107) 084 private volatile boolean closing = false; 085 private final boolean get; 086 private final boolean explicitColumnQuery; 087 private final boolean useRowColBloom; 088 /** 089 * A flag that enables StoreFileScanner parallel-seeking 090 */ 091 private boolean parallelSeekEnabled = false; 092 private ExecutorService executor; 093 private final Scan scan; 094 private final long oldestUnexpiredTS; 095 private final long now; 096 private final int minVersions; 097 private final long maxRowSize; 098 private final long cellsPerHeartbeatCheck; 099 long memstoreOnlyReads; 100 long mixedReads; 101 102 // 1) Collects all the KVHeap that are eagerly getting closed during the 103 // course of a scan 104 // 2) Collects the unused memstore scanners. If we close the memstore scanners 105 // before sending data to client, the chunk may be reclaimed by other 106 // updates and the data will be corrupt. 107 private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); 108 109 /** 110 * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via 111 * seeking to next row/column. TODO: estimate them? 112 */ 113 private long kvsScanned = 0; 114 private ExtendedCell prevCell = null; 115 116 private final long preadMaxBytes; 117 private long bytesRead; 118 119 /** We don't ever expect to change this, the constant is just for clarity. */ 120 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; 121 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = 122 "hbase.storescanner.parallel.seek.enable"; 123 124 /** Used during unit testing to ensure that lazy seek does save seek ops */ 125 private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; 126 127 /** 128 * The number of cells scanned in between timeout checks. Specifying a larger value means that 129 * timeout checks will occur less frequently. Specifying a small value will lead to more frequent 130 * timeout checks. 131 */ 132 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 133 ConfigKey.LONG("hbase.cells.scanned.per.heartbeat.check"); 134 135 /** 136 * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. 137 */ 138 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; 139 140 /** 141 * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned 142 * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of 143 * block size for this store. If configured with a value <0, for all scans with ReadType DEFAULT, 144 * we will open scanner with stream mode itself. 145 */ 146 public static final String STORESCANNER_PREAD_MAX_BYTES = 147 ConfigKey.LONG("hbase.storescanner.pread.max.bytes"); 148 149 private final Scan.ReadType readType; 150 151 // A flag whether use pread for scan 152 // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. 153 private boolean scanUsePread; 154 // Indicates whether there was flush during the course of the scan 155 private volatile boolean flushed = false; 156 // generally we get one file from a flush 157 private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); 158 // Since CompactingMemstore is now default, we get three memstore scanners from a flush 159 private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); 160 // The current list of scanners 161 final List<KeyValueScanner> currentScanners = new ArrayList<>(); 162 // flush update lock 163 private final ReentrantLock flushLock = new ReentrantLock(); 164 // lock for closing. 165 private final ReentrantLock closeLock = new ReentrantLock(); 166 167 protected final long readPt; 168 private boolean topChanged = false; 169 170 /** An internal constructor. */ 171 private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, 172 boolean cacheBlocks, ScanType scanType) { 173 this.readPt = readPt; 174 this.store = store; 175 this.cacheBlocks = cacheBlocks; 176 this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); 177 get = scan.isGetScan(); 178 explicitColumnQuery = numColumns > 0; 179 this.scan = scan; 180 this.now = EnvironmentEdgeManager.currentTime(); 181 this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); 182 this.minVersions = scanInfo.getMinVersions(); 183 184 // We look up row-column Bloom filters for multi-column queries as part of 185 // the seek operation. However, we also look the row-column Bloom filter 186 // for multi-row (non-"get") scans because this is not done in 187 // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). 188 this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) && (store == null 189 || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL); 190 this.maxRowSize = scanInfo.getTableMaxRowSize(); 191 this.preadMaxBytes = scanInfo.getPreadMaxBytes(); 192 if (get) { 193 this.readType = Scan.ReadType.PREAD; 194 this.scanUsePread = true; 195 } else if (scanType != ScanType.USER_SCAN) { 196 // For compaction scanners never use Pread as already we have stream based scanners on the 197 // store files to be compacted 198 this.readType = Scan.ReadType.STREAM; 199 this.scanUsePread = false; 200 } else { 201 if (scan.getReadType() == Scan.ReadType.DEFAULT) { 202 if (scanInfo.isUsePread()) { 203 this.readType = Scan.ReadType.PREAD; 204 } else if (this.preadMaxBytes < 0) { 205 this.readType = Scan.ReadType.STREAM; 206 } else { 207 this.readType = Scan.ReadType.DEFAULT; 208 } 209 } else { 210 this.readType = scan.getReadType(); 211 } 212 // Always start with pread unless user specific stream. Will change to stream later if 213 // readType is default if the scan keeps running for a long time. 214 this.scanUsePread = this.readType != Scan.ReadType.STREAM; 215 } 216 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); 217 // Parallel seeking is on if the config allows and more there is more than one store file. 218 if (store != null && store.getStorefilesCount() > 1) { 219 RegionServerServices rsService = store.getHRegion().getRegionServerServices(); 220 if (rsService != null && scanInfo.isParallelSeekEnabled()) { 221 this.parallelSeekEnabled = true; 222 this.executor = rsService.getExecutorService(); 223 } 224 } 225 } 226 227 private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { 228 this.currentScanners.addAll(scanners); 229 } 230 231 private static boolean isOnlyLatestVersionScan(Scan scan) { 232 // No need to check for Scan#getMaxVersions because live version files generated by store file 233 // writer retains max versions specified in ColumnFamilyDescriptor for the given CF 234 return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP; 235 } 236 237 /** 238 * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a 239 * compaction. 240 * @param store who we scan 241 * @param scan the spec 242 * @param columns which columns we are scanning 243 */ 244 public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 245 long readPt) throws IOException { 246 this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), 247 ScanType.USER_SCAN); 248 if (columns != null && scan.isRaw()) { 249 throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); 250 } 251 matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, 252 store.getCoprocessorHost()); 253 254 store.addChangedReaderObserver(this); 255 256 List<KeyValueScanner> scanners = null; 257 try { 258 // Pass columns to try to filter out unnecessary StoreFiles. 259 scanners = selectScannersFrom(store, 260 store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), 261 scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt, 262 isOnlyLatestVersionScan(scan))); 263 264 // Seek all scanners to the start of the Row (or if the exact matching row 265 // key does not exist, then to the start of the next matching Row). 266 // Always check bloom filter to optimize the top row seek for delete 267 // family marker. 268 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, 269 parallelSeekEnabled); 270 271 // set storeLimit 272 this.storeLimit = scan.getMaxResultsPerColumnFamily(); 273 274 // set rowOffset 275 this.storeOffset = scan.getRowOffsetPerColumnFamily(); 276 addCurrentScanners(scanners); 277 // Combine all seeked scanners with a heap 278 resetKVHeap(scanners, comparator); 279 } catch (IOException e) { 280 clearAndClose(scanners); 281 // remove us from the HStore#changedReaderObservers here or we'll have no chance to 282 // and might cause memory leak 283 store.deleteChangedReaderObserver(this); 284 throw e; 285 } 286 } 287 288 // a dummy scan instance for compaction. 289 private static final Scan SCAN_FOR_COMPACTION = new Scan(); 290 291 /** 292 * Used for store file compaction and memstore compaction. 293 * <p> 294 * Opens a scanner across specified StoreFiles/MemStoreSegments. 295 * @param store who we scan 296 * @param scanners ancillary scanners 297 * @param smallestReadPoint the readPoint that we should use for tracking versions 298 */ 299 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 300 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { 301 this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); 302 } 303 304 /** 305 * Used for compactions that drop deletes from a limited range of rows. 306 * <p> 307 * Opens a scanner across specified StoreFiles. 308 * @param store who we scan 309 * @param scanners ancillary scanners 310 * @param smallestReadPoint the readPoint that we should use for tracking versions 311 * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. 312 * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. 313 */ 314 public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 315 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) 316 throws IOException { 317 this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 318 earliestPutTs, dropDeletesFromRow, dropDeletesToRow); 319 } 320 321 private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, 322 ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, 323 byte[] dropDeletesToRow) throws IOException { 324 this(store, SCAN_FOR_COMPACTION, scanInfo, 0, 325 store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); 326 assert scanType != ScanType.USER_SCAN; 327 matcher = 328 CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, 329 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); 330 331 // Filter the list of scanners using Bloom filters, time range, TTL, etc. 332 scanners = selectScannersFrom(store, scanners); 333 334 // Seek all scanners to the initial key 335 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 336 addCurrentScanners(scanners); 337 // Combine all seeked scanners with a heap 338 resetKVHeap(scanners, comparator); 339 } 340 341 private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) 342 throws IOException { 343 // Seek all scanners to the initial key 344 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); 345 addCurrentScanners(scanners); 346 resetKVHeap(scanners, comparator); 347 } 348 349 // For mob compaction only as we do not have a Store instance when doing mob compaction. 350 public StoreScanner(ScanInfo scanInfo, ScanType scanType, 351 List<? extends KeyValueScanner> scanners) throws IOException { 352 this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); 353 assert scanType != ScanType.USER_SCAN; 354 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, 355 oldestUnexpiredTS, now, null, null, null); 356 seekAllScanner(scanInfo, scanners); 357 } 358 359 // Used to instantiate a scanner for user scan in test 360 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 361 List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { 362 // 0 is passed as readpoint because the test bypasses Store 363 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 364 scanType); 365 if (scanType == ScanType.USER_SCAN) { 366 this.matcher = 367 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 368 } else { 369 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 370 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 371 } 372 seekAllScanner(scanInfo, scanners); 373 } 374 375 // Used to instantiate a scanner for user scan in test 376 StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, 377 List<? extends KeyValueScanner> scanners) throws IOException { 378 // 0 is passed as readpoint because the test bypasses Store 379 this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), 380 ScanType.USER_SCAN); 381 this.matcher = 382 UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); 383 seekAllScanner(scanInfo, scanners); 384 } 385 386 // Used to instantiate a scanner for compaction in test 387 StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, 388 List<? extends KeyValueScanner> scanners) throws IOException { 389 // 0 is passed as readpoint because the test bypasses Store 390 this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION, 391 scanInfo, 0, 0L, false, scanType); 392 this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 393 PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); 394 seekAllScanner(scanInfo, scanners); 395 } 396 397 boolean isScanUsePread() { 398 return this.scanUsePread; 399 } 400 401 /** 402 * Seek the specified scanners with the given key 403 * @param isLazy true if using lazy seek 404 * @param isParallelSeek true if using parallel seek 405 */ 406 protected void seekScanners(List<? extends KeyValueScanner> scanners, ExtendedCell seekKey, 407 boolean isLazy, boolean isParallelSeek) throws IOException { 408 // Seek all scanners to the start of the Row (or if the exact matching row 409 // key does not exist, then to the start of the next matching Row). 410 // Always check bloom filter to optimize the top row seek for delete 411 // family marker. 412 if (isLazy) { 413 for (KeyValueScanner scanner : scanners) { 414 scanner.requestSeek(seekKey, false, true); 415 } 416 } else { 417 if (!isParallelSeek) { 418 long totalScannersSoughtBytes = 0; 419 for (KeyValueScanner scanner : scanners) { 420 if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { 421 throw new RowTooBigException( 422 "Max row size allowed: " + maxRowSize + ", but row is bigger than that"); 423 } 424 scanner.seek(seekKey); 425 Cell c = scanner.peek(); 426 if (c != null) { 427 totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); 428 } 429 } 430 } else { 431 parallelSeek(scanners, seekKey); 432 } 433 } 434 } 435 436 protected void resetKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator) 437 throws IOException { 438 // Combine all seeked scanners with a heap 439 heap = newKVHeap(scanners, comparator); 440 } 441 442 protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, 443 CellComparator comparator) throws IOException { 444 return new KeyValueHeap(scanners, comparator); 445 } 446 447 /** 448 * Filters the given list of scanners using Bloom filter, time range, and TTL. 449 * <p> 450 * Will be overridden by testcase so declared as protected. 451 */ 452 protected List<KeyValueScanner> selectScannersFrom(HStore store, 453 List<? extends KeyValueScanner> allScanners) { 454 boolean memOnly; 455 boolean filesOnly; 456 if (scan instanceof InternalScan) { 457 InternalScan iscan = (InternalScan) scan; 458 memOnly = iscan.isCheckOnlyMemStore(); 459 filesOnly = iscan.isCheckOnlyStoreFiles(); 460 } else { 461 memOnly = false; 462 filesOnly = false; 463 } 464 465 List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); 466 467 // We can only exclude store files based on TTL if minVersions is set to 0. 468 // Otherwise, we might have to return KVs that have technically expired. 469 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; 470 471 // include only those scan files which pass all filters 472 for (KeyValueScanner kvs : allScanners) { 473 boolean isFile = kvs.isFileScanner(); 474 if ((!isFile && filesOnly) || (isFile && memOnly)) { 475 kvs.close(); 476 continue; 477 } 478 479 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { 480 scanners.add(kvs); 481 } else { 482 kvs.close(); 483 } 484 } 485 return scanners; 486 } 487 488 @Override 489 public ExtendedCell peek() { 490 return heap != null ? heap.peek() : null; 491 } 492 493 @Override 494 public KeyValue next() { 495 // throw runtime exception perhaps? 496 throw new RuntimeException("Never call StoreScanner.next()"); 497 } 498 499 @Override 500 public void close() { 501 close(true); 502 } 503 504 private void close(boolean withDelayedScannersClose) { 505 closeLock.lock(); 506 // If the closeLock is acquired then any subsequent updateReaders() 507 // call is ignored. 508 try { 509 if (this.closing) { 510 return; 511 } 512 if (withDelayedScannersClose) { 513 this.closing = true; 514 } 515 // For mob compaction, we do not have a store. 516 if (this.store != null) { 517 this.store.deleteChangedReaderObserver(this); 518 } 519 if (withDelayedScannersClose) { 520 clearAndClose(scannersForDelayedClose); 521 clearAndClose(memStoreScannersAfterFlush); 522 clearAndClose(flushedstoreFileScanners); 523 if (this.heap != null) { 524 this.heap.close(); 525 this.currentScanners.clear(); 526 this.heap = null; // CLOSED! 527 } 528 } else { 529 if (this.heap != null) { 530 this.scannersForDelayedClose.add(this.heap); 531 this.currentScanners.clear(); 532 this.heap = null; 533 } 534 } 535 } finally { 536 closeLock.unlock(); 537 } 538 } 539 540 @Override 541 public boolean seek(ExtendedCell key) throws IOException { 542 if (checkFlushed()) { 543 reopenAfterFlush(); 544 } 545 return this.heap.seek(key); 546 } 547 548 /** 549 * Get the next row of values from this Store. 550 * @return true if there are more rows, false if scanner is done 551 */ 552 @Override 553 public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext) 554 throws IOException { 555 if (scannerContext == null) { 556 throw new IllegalArgumentException("Scanner context cannot be null"); 557 } 558 if (checkFlushed() && reopenAfterFlush()) { 559 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 560 } 561 562 // if the heap was left null, then the scanners had previously run out anyways, close and 563 // return. 564 if (this.heap == null) { 565 // By this time partial close should happened because already heap is null 566 close(false);// Do all cleanup except heap.close() 567 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 568 } 569 570 ExtendedCell cell = this.heap.peek(); 571 if (cell == null) { 572 close(false);// Do all cleanup except heap.close() 573 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 574 } 575 576 // only call setRow if the row changes; avoids confusing the query matcher 577 // if scanning intra-row 578 579 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing 580 // rows. Else it is possible we are still traversing the same row so we must perform the row 581 // comparison. 582 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { 583 this.countPerRow = 0; 584 matcher.setToNewRow(cell); 585 } 586 587 // Clear progress away unless invoker has indicated it should be kept. 588 if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) { 589 scannerContext.clearProgress(); 590 } 591 592 Optional<RpcCall> rpcCall = 593 matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty(); 594 // re-useable closure to avoid allocations 595 IntConsumer recordBlockSize = blockSize -> { 596 if (rpcCall.isPresent()) { 597 rpcCall.get().incrementBlockBytesScanned(blockSize); 598 } 599 scannerContext.incrementBlockProgress(blockSize); 600 }; 601 602 int count = 0; 603 long totalBytesRead = 0; 604 // track the cells for metrics only if it is a user read request. 605 boolean onlyFromMemstore = matcher.isUserScan(); 606 try { 607 LOOP: do { 608 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck 609 // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream 610 // in 611 // the shipped method below. 612 if ( 613 kvsScanned % cellsPerHeartbeatCheck == 0 614 || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) 615 ) { 616 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { 617 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); 618 } 619 } 620 // Do object compare - we set prevKV from the same heap. 621 if (prevCell != cell) { 622 ++kvsScanned; 623 } 624 checkScanOrder(prevCell, cell, comparator); 625 int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); 626 bytesRead += cellSize; 627 if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { 628 // return immediately if we want to switch from pread to stream. We need this because we 629 // can 630 // only switch in the shipped method, if user use a filter to filter out everything and 631 // rpc 632 // timeout is very large then the shipped method will never be called until the whole scan 633 // is finished, but at that time we have already scan all the data... 634 // See HBASE-20457 for more details. 635 // And there is still a scenario that can not be handled. If we have a very large row, 636 // which 637 // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag 638 // here, we still need to scan all the qualifiers before returning... 639 scannerContext.returnImmediately(); 640 } 641 642 heap.recordBlockSize(recordBlockSize); 643 644 prevCell = cell; 645 scannerContext.setLastPeekedCell(cell); 646 topChanged = false; 647 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 648 switch (qcode) { 649 case INCLUDE: 650 case INCLUDE_AND_SEEK_NEXT_ROW: 651 case INCLUDE_AND_SEEK_NEXT_COL: 652 Filter f = matcher.getFilter(); 653 if (f != null) { 654 Cell transformedCell = f.transformCell(cell); 655 // fast path, most filters just return the same cell instance 656 if (transformedCell != cell) { 657 if (transformedCell instanceof ExtendedCell) { 658 cell = (ExtendedCell) transformedCell; 659 } else { 660 throw new DoNotRetryIOException("Incorrect filter implementation, " 661 + "the Cell returned by transformCell is not an ExtendedCell. Filter class: " 662 + f.getClass().getName()); 663 } 664 } 665 } 666 this.countPerRow++; 667 668 // add to results only if we have skipped #storeOffset kvs 669 // also update metric accordingly 670 if (this.countPerRow > storeOffset) { 671 outResult.add(cell); 672 673 // Update local tracking information 674 count++; 675 totalBytesRead += cellSize; 676 677 /** 678 * Increment the metric if all the cells are from memstore. If not we will account it 679 * for mixed reads 680 */ 681 onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); 682 // Update the progress of the scanner context 683 scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); 684 scannerContext.incrementBatchProgress(1); 685 686 if (matcher.isUserScan() && totalBytesRead > maxRowSize) { 687 String message = "Max row size allowed: " + maxRowSize 688 + ", but the row is bigger than that, the row info: " 689 + CellUtil.toString(cell, false) + ", already have process row cells = " 690 + outResult.size() + ", it belong to region = " 691 + store.getHRegion().getRegionInfo().getRegionNameAsString(); 692 LOG.warn(message); 693 throw new RowTooBigException(message); 694 } 695 696 if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) { 697 // do what SEEK_NEXT_ROW does. 698 if (!matcher.moreRowsMayExistAfter(cell)) { 699 close(false);// Do all cleanup except heap.close() 700 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 701 } 702 matcher.clearCurrentRow(); 703 seekToNextRow(cell); 704 break LOOP; 705 } 706 } 707 708 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { 709 if (!matcher.moreRowsMayExistAfter(cell)) { 710 close(false);// Do all cleanup except heap.close() 711 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 712 } 713 matcher.clearCurrentRow(); 714 seekOrSkipToNextRow(cell); 715 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { 716 seekOrSkipToNextColumn(cell); 717 } else { 718 this.heap.next(); 719 } 720 721 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { 722 break LOOP; 723 } 724 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 725 break LOOP; 726 } 727 continue; 728 729 case DONE: 730 // Optimization for Gets! If DONE, no more to get on this row, early exit! 731 if (get) { 732 // Then no more to this row... exit. 733 close(false);// Do all cleanup except heap.close() 734 // update metric 735 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 736 } 737 matcher.clearCurrentRow(); 738 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 739 740 case DONE_SCAN: 741 close(false);// Do all cleanup except heap.close() 742 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 743 744 case SEEK_NEXT_ROW: 745 // This is just a relatively simple end of scan fix, to short-cut end 746 // us if there is an endKey in the scan. 747 if (!matcher.moreRowsMayExistAfter(cell)) { 748 close(false);// Do all cleanup except heap.close() 749 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 750 } 751 matcher.clearCurrentRow(); 752 seekOrSkipToNextRow(cell); 753 NextState stateAfterSeekNextRow = needToReturn(outResult); 754 if (stateAfterSeekNextRow != null) { 755 return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); 756 } 757 break; 758 759 case SEEK_NEXT_COL: 760 seekOrSkipToNextColumn(cell); 761 NextState stateAfterSeekNextColumn = needToReturn(outResult); 762 if (stateAfterSeekNextColumn != null) { 763 return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); 764 } 765 break; 766 767 case SKIP: 768 this.heap.next(); 769 break; 770 771 case SEEK_NEXT_USING_HINT: 772 ExtendedCell nextKV = matcher.getNextKeyHint(cell); 773 if (nextKV != null) { 774 int difference = comparator.compare(nextKV, cell); 775 if ( 776 ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0)) 777 ) { 778 seekAsDirection(nextKV); 779 NextState stateAfterSeekByHint = needToReturn(outResult); 780 if (stateAfterSeekByHint != null) { 781 return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); 782 } 783 break; 784 } 785 } 786 heap.next(); 787 break; 788 789 default: 790 throw new RuntimeException("UNEXPECTED"); 791 } 792 793 // One last chance to break due to size limit. The INCLUDE* cases above already check 794 // limit and continue. For the various filtered cases, we need to check because block 795 // size limit may have been exceeded even if we don't add cells to result list. 796 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { 797 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 798 } 799 } while ((cell = this.heap.peek()) != null); 800 801 if (count > 0) { 802 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); 803 } 804 805 // No more keys 806 close(false);// Do all cleanup except heap.close() 807 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); 808 } finally { 809 // increment only if we have some result 810 if (count > 0 && matcher.isUserScan()) { 811 // if true increment memstore metrics, if not the mixed one 812 updateMetricsStore(onlyFromMemstore); 813 } 814 } 815 } 816 817 private void updateMetricsStore(boolean memstoreRead) { 818 if (store != null) { 819 store.updateMetricsStore(memstoreRead); 820 } else { 821 // for testing. 822 if (memstoreRead) { 823 memstoreOnlyReads++; 824 } else { 825 mixedReads++; 826 } 827 } 828 } 829 830 /** 831 * If the top cell won't be flushed into disk, the new top cell may be changed after 832 * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the 833 * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell 834 * is changed, we should return the current cells. Otherwise, we may return the cells across 835 * different rows. 836 * @param outResult the cells which are visible for user scan 837 * @return null is the top cell doesn't change. Otherwise, the NextState to return 838 */ 839 private NextState needToReturn(List<? super ExtendedCell> outResult) { 840 if (!outResult.isEmpty() && topChanged) { 841 return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; 842 } 843 return null; 844 } 845 846 private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException { 847 // If it is a Get Scan, then we know that we are done with this row; there are no more 848 // rows beyond the current one: don't try to optimize. 849 if (!get) { 850 if (trySkipToNextRow(cell)) { 851 return; 852 } 853 } 854 seekToNextRow(cell); 855 } 856 857 private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException { 858 if (!trySkipToNextColumn(cell)) { 859 seekAsDirection(matcher.getKeyForNextColumn(cell)); 860 } 861 } 862 863 /** 864 * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). 865 * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an 866 * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to 867 * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the 868 * current, loaded block). It does this by looking at the next indexed key of the current HFile. 869 * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible 870 * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work 871 * with the current Cell but compare as though it were a seek key; see down in 872 * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK, 873 * otherwise we just SKIP to the next requested cell. 874 * <p> 875 * Other notes: 876 * <ul> 877 * <li>Rows can straddle block boundaries</li> 878 * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a 879 * different block than column C1 at T2)</li> 880 * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a few 881 * SKIPs...</li> 882 * <li>We want to SEEK when the chance is high that we'll be able to seek past many Cells, 883 * especially if we know we need to go to the next block.</li> 884 * </ul> 885 * <p> 886 * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll 887 * likely end up seeking to the next block (or past the next block) to get our next column. 888 * Example: 889 * 890 * <pre> 891 * | BLOCK 1 | BLOCK 2 | 892 * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | 893 * ^ ^ 894 * | | 895 * Next Index Key SEEK_NEXT_ROW (before r2/c1) 896 * 897 * 898 * | BLOCK 1 | BLOCK 2 | 899 * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | 900 * ^ ^ 901 * | | 902 * Next Index Key SEEK_NEXT_COL 903 * </pre> 904 * 905 * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 906 * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only 907 * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at 908 * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios 909 * where the SEEK will not land us in the next block, it is very likely better to issues a series 910 * of SKIPs. 911 * @param cell current cell 912 * @return true means skip to next row, false means not 913 */ 914 protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException { 915 ExtendedCell nextCell = null; 916 // used to guard against a changed next indexed key by doing a identity comparison 917 // when the identity changes we need to compare the bytes again 918 ExtendedCell previousIndexedKey = null; 919 do { 920 ExtendedCell nextIndexedKey = getNextIndexedKey(); 921 if ( 922 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 923 && (nextIndexedKey == previousIndexedKey 924 || matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) 925 ) { 926 this.heap.next(); 927 ++kvsScanned; 928 previousIndexedKey = nextIndexedKey; 929 } else { 930 return false; 931 } 932 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); 933 return true; 934 } 935 936 /** 937 * See {@link #trySkipToNextRow(ExtendedCell)} 938 * @param cell current cell 939 * @return true means skip to next column, false means not 940 */ 941 protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException { 942 ExtendedCell nextCell = null; 943 // used to guard against a changed next indexed key by doing a identity comparison 944 // when the identity changes we need to compare the bytes again 945 ExtendedCell previousIndexedKey = null; 946 do { 947 ExtendedCell nextIndexedKey = getNextIndexedKey(); 948 if ( 949 nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY 950 && (nextIndexedKey == previousIndexedKey 951 || matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) 952 ) { 953 this.heap.next(); 954 ++kvsScanned; 955 previousIndexedKey = nextIndexedKey; 956 } else { 957 return false; 958 } 959 } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); 960 // We need this check because it may happen that the new scanner that we get 961 // during heap.next() is requiring reseek due of fake KV previously generated for 962 // ROWCOL bloom filter optimization. See HBASE-19863 for more details 963 if ( 964 useRowColBloom && nextCell != null && cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP 965 ) { 966 return false; 967 } 968 return true; 969 } 970 971 @Override 972 public long getReadPoint() { 973 return this.readPt; 974 } 975 976 private static void clearAndClose(List<KeyValueScanner> scanners) { 977 if (scanners == null) { 978 return; 979 } 980 for (KeyValueScanner s : scanners) { 981 s.close(); 982 } 983 scanners.clear(); 984 } 985 986 // Implementation of ChangedReadersObserver 987 @Override 988 public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) 989 throws IOException { 990 if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { 991 return; 992 } 993 boolean updateReaders = false; 994 flushLock.lock(); 995 try { 996 if (!closeLock.tryLock()) { 997 // The reason for doing this is that when the current store scanner does not retrieve 998 // any new cells, then the scanner is considered to be done. The heap of this scanner 999 // is not closed till the shipped() call is completed. Hence in that case if at all 1000 // the partial close (close (false)) has been called before updateReaders(), there is no 1001 // need for the updateReaders() to happen. 1002 LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); 1003 // no lock acquired. 1004 clearAndClose(memStoreScanners); 1005 return; 1006 } 1007 // lock acquired 1008 updateReaders = true; 1009 if (this.closing) { 1010 LOG.debug("StoreScanner already closing. There is no need to updateReaders"); 1011 clearAndClose(memStoreScanners); 1012 return; 1013 } 1014 flushed = true; 1015 final boolean isCompaction = false; 1016 boolean usePread = get || scanUsePread; 1017 // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner 1018 // calls next(). So its better we create scanners here rather than next() call. Ensure 1019 // these scanners are properly closed() whether or not the scan is completed successfully 1020 // Eagerly creating scanners so that we have the ref counting ticking on the newly created 1021 // store files. In case of stream scanners this eager creation does not induce performance 1022 // penalty because in scans (that uses stream scanners) the next() call is bound to happen. 1023 List<KeyValueScanner> scanners = 1024 store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher, 1025 scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan)); 1026 flushedstoreFileScanners.addAll(scanners); 1027 if (!CollectionUtils.isEmpty(memStoreScanners)) { 1028 clearAndClose(memStoreScannersAfterFlush); 1029 memStoreScannersAfterFlush.addAll(memStoreScanners); 1030 } 1031 } finally { 1032 flushLock.unlock(); 1033 if (updateReaders) { 1034 closeLock.unlock(); 1035 } 1036 } 1037 // Let the next() call handle re-creating and seeking 1038 } 1039 1040 /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */ 1041 protected final boolean reopenAfterFlush() throws IOException { 1042 // here we can make sure that we have a Store instance so no null check on store. 1043 ExtendedCell lastTop = heap.peek(); 1044 // When we have the scan object, should we not pass it to getScanners() to get a limited set of 1045 // scanners? We did so in the constructor and we could have done it now by storing the scan 1046 // object from the constructor 1047 List<KeyValueScanner> scanners; 1048 flushLock.lock(); 1049 try { 1050 List<KeyValueScanner> allScanners = 1051 new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); 1052 allScanners.addAll(flushedstoreFileScanners); 1053 allScanners.addAll(memStoreScannersAfterFlush); 1054 scanners = selectScannersFrom(store, allScanners); 1055 // Clear the current set of flushed store files scanners so that they don't get added again 1056 flushedstoreFileScanners.clear(); 1057 memStoreScannersAfterFlush.clear(); 1058 } finally { 1059 flushLock.unlock(); 1060 } 1061 1062 // Seek the new scanners to the last key 1063 seekScanners(scanners, lastTop, false, parallelSeekEnabled); 1064 // remove the older memstore scanner 1065 for (int i = currentScanners.size() - 1; i >= 0; i--) { 1066 if (!currentScanners.get(i).isFileScanner()) { 1067 scannersForDelayedClose.add(currentScanners.remove(i)); 1068 } else { 1069 // we add the memstore scanner to the end of currentScanners 1070 break; 1071 } 1072 } 1073 // add the newly created scanners on the flushed files and the current active memstore scanner 1074 addCurrentScanners(scanners); 1075 // Combine all seeked scanners with a heap 1076 resetKVHeap(this.currentScanners, store.getComparator()); 1077 resetQueryMatcher(lastTop); 1078 if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { 1079 LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() 1080 + ",and after = " + heap.peek()); 1081 topChanged = true; 1082 } else { 1083 topChanged = false; 1084 } 1085 return topChanged; 1086 } 1087 1088 private void resetQueryMatcher(ExtendedCell lastTopKey) { 1089 // Reset the state of the Query Matcher and set to top row. 1090 // Only reset and call setRow if the row changes; avoids confusing the 1091 // query matcher if scanning intra-row. 1092 ExtendedCell cell = heap.peek(); 1093 if (cell == null) { 1094 cell = lastTopKey; 1095 } 1096 if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { 1097 this.countPerRow = 0; 1098 // The setToNewRow will call reset internally 1099 matcher.setToNewRow(cell); 1100 } 1101 } 1102 1103 /** 1104 * Check whether scan as expected order 1105 */ 1106 protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator) 1107 throws IOException { 1108 // Check that the heap gives us KVs in an increasing order. 1109 assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 1110 : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store; 1111 } 1112 1113 protected boolean seekToNextRow(ExtendedCell c) throws IOException { 1114 return reseek(PrivateCellUtil.createLastOnRow(c)); 1115 } 1116 1117 /** 1118 * Do a reseek in a normal StoreScanner(scan forward) 1119 * @return true if scanner has values left, false if end of scanner 1120 */ 1121 protected boolean seekAsDirection(ExtendedCell kv) throws IOException { 1122 return reseek(kv); 1123 } 1124 1125 @Override 1126 public boolean reseek(ExtendedCell kv) throws IOException { 1127 if (checkFlushed()) { 1128 reopenAfterFlush(); 1129 } 1130 if (explicitColumnQuery && lazySeekEnabledGlobally) { 1131 return heap.requestSeek(kv, true, useRowColBloom); 1132 } 1133 return heap.reseek(kv); 1134 } 1135 1136 void trySwitchToStreamRead() { 1137 if ( 1138 readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null 1139 || bytesRead < preadMaxBytes 1140 ) { 1141 return; 1142 } 1143 LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, 1144 this.store.getColumnFamilyName()); 1145 scanUsePread = false; 1146 ExtendedCell lastTop = heap.peek(); 1147 List<KeyValueScanner> memstoreScanners = new ArrayList<>(); 1148 List<KeyValueScanner> scannersToClose = new ArrayList<>(); 1149 for (KeyValueScanner kvs : currentScanners) { 1150 if (!kvs.isFileScanner()) { 1151 // collect memstorescanners here 1152 memstoreScanners.add(kvs); 1153 } else { 1154 scannersToClose.add(kvs); 1155 } 1156 } 1157 List<KeyValueScanner> fileScanners = null; 1158 List<KeyValueScanner> newCurrentScanners; 1159 KeyValueHeap newHeap; 1160 try { 1161 // We must have a store instance here so no null check 1162 // recreate the scanners on the current file scanners 1163 fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, 1164 scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 1165 readPt, false); 1166 if (fileScanners == null) { 1167 return; 1168 } 1169 seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); 1170 newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); 1171 newCurrentScanners.addAll(fileScanners); 1172 newCurrentScanners.addAll(memstoreScanners); 1173 newHeap = newKVHeap(newCurrentScanners, comparator); 1174 } catch (Exception e) { 1175 LOG.warn("failed to switch to stream read", e); 1176 if (fileScanners != null) { 1177 fileScanners.forEach(KeyValueScanner::close); 1178 } 1179 return; 1180 } 1181 currentScanners.clear(); 1182 addCurrentScanners(newCurrentScanners); 1183 this.heap = newHeap; 1184 resetQueryMatcher(lastTop); 1185 scannersToClose.forEach(KeyValueScanner::close); 1186 } 1187 1188 protected final boolean checkFlushed() { 1189 // check the var without any lock. Suppose even if we see the old 1190 // value here still it is ok to continue because we will not be resetting 1191 // the heap but will continue with the referenced memstore's snapshot. For compactions 1192 // any way we don't need the updateReaders at all to happen as we still continue with 1193 // the older files 1194 if (flushed) { 1195 // If there is a flush and the current scan is notified on the flush ensure that the 1196 // scan's heap gets reset and we do a seek on the newly flushed file. 1197 if (this.closing) { 1198 return false; 1199 } 1200 // reset the flag 1201 flushed = false; 1202 return true; 1203 } 1204 return false; 1205 } 1206 1207 /** 1208 * Seek storefiles in parallel to optimize IO latency as much as possible 1209 * @param scanners the list {@link KeyValueScanner}s to be read from 1210 * @param kv the KeyValue on which the operation is being requested 1211 */ 1212 private void parallelSeek(final List<? extends KeyValueScanner> scanners, final ExtendedCell kv) 1213 throws IOException { 1214 if (scanners.isEmpty()) return; 1215 int storeFileScannerCount = scanners.size(); 1216 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); 1217 List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); 1218 for (KeyValueScanner scanner : scanners) { 1219 if (scanner instanceof StoreFileScanner) { 1220 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch); 1221 executor.submit(seekHandler); 1222 handlers.add(seekHandler); 1223 } else { 1224 scanner.seek(kv); 1225 latch.countDown(); 1226 } 1227 } 1228 1229 try { 1230 latch.await(); 1231 } catch (InterruptedException ie) { 1232 throw (InterruptedIOException) new InterruptedIOException().initCause(ie); 1233 } 1234 1235 for (ParallelSeekHandler handler : handlers) { 1236 if (handler.getErr() != null) { 1237 throw new IOException(handler.getErr()); 1238 } 1239 } 1240 } 1241 1242 /** 1243 * Used in testing. 1244 * @return all scanners in no particular order 1245 */ 1246 List<KeyValueScanner> getAllScannersForTesting() { 1247 List<KeyValueScanner> allScanners = new ArrayList<>(); 1248 KeyValueScanner current = heap.getCurrentForTesting(); 1249 if (current != null) allScanners.add(current); 1250 for (KeyValueScanner scanner : heap.getHeap()) 1251 allScanners.add(scanner); 1252 return allScanners; 1253 } 1254 1255 static void enableLazySeekGlobally(boolean enable) { 1256 lazySeekEnabledGlobally = enable; 1257 } 1258 1259 /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */ 1260 public long getEstimatedNumberOfKvsScanned() { 1261 return this.kvsScanned; 1262 } 1263 1264 @Override 1265 public ExtendedCell getNextIndexedKey() { 1266 return this.heap.getNextIndexedKey(); 1267 } 1268 1269 @Override 1270 public void shipped() throws IOException { 1271 if (prevCell != null) { 1272 // Do the copy here so that in case the prevCell ref is pointing to the previous 1273 // blocks we can safely release those blocks. 1274 // This applies to blocks that are got from Bucket cache, L1 cache and the blocks 1275 // fetched from HDFS. Copying this would ensure that we let go the references to these 1276 // blocks so that they can be GCed safely(in case of bucket cache) 1277 prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 1278 } 1279 matcher.beforeShipped(); 1280 // There wont be further fetch of Cells from these scanners. Just close. 1281 clearAndClose(scannersForDelayedClose); 1282 if (this.heap != null) { 1283 this.heap.shipped(); 1284 // When switching from pread to stream, we will open a new scanner for each store file, but 1285 // the old scanner may still track the HFileBlocks we have scanned but not sent back to client 1286 // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others 1287 // before we serialize and send it back to client. The HFileBlocks will be released in shipped 1288 // method, so we here will also open new scanners and close old scanners in shipped method. 1289 // See HBASE-18055 for more details. 1290 trySwitchToStreamRead(); 1291 } 1292 } 1293}