001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.PriorityQueue;
025import java.util.function.IntConsumer;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.ExtendedCell;
029import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Implements a heap merge across any number of KeyValueScanners.
036 * <p>
037 * Implements KeyValueScanner itself.
038 * <p>
039 * This class is used at the Region level to merge across Stores and at the Store level to merge
040 * across the memstore and StoreFiles.
041 * <p>
042 * In the Region case, we also need InternalScanner.next(List), so this class also implements
043 * InternalScanner. WARNING: As is, if you try to use this as an InternalScanner at the Store level,
044 * you will get runtime exceptions.
045 */
046@InterfaceAudience.Private
047public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
048  implements KeyValueScanner, InternalScanner {
049  private static final Logger LOG = LoggerFactory.getLogger(KeyValueHeap.class);
050  protected PriorityQueue<KeyValueScanner> heap = null;
051  // Holds the scanners when a ever a eager close() happens. All such eagerly closed
052  // scans are collected and when the final scanner.close() happens will perform the
053  // actual close.
054  protected List<KeyValueScanner> scannersForDelayedClose = null;
055
056  /**
057   * The current sub-scanner, i.e. the one that contains the next key/value to return to the client.
058   * This scanner is NOT included in {@link #heap} (but we frequently add it back to the heap and
059   * pull the new winner out). We maintain an invariant that the current sub-scanner has already
060   * done a real seek, and that current.peek() is always a real key/value (or null) except for the
061   * fake last-key-on-row-column supplied by the multi-column Bloom filter optimization, which is OK
062   * to propagate to StoreScanner. In order to ensure that, always use {@link #pollRealKV()} to
063   * update current.
064   */
065  protected KeyValueScanner current = null;
066
067  protected KVScannerComparator comparator;
068
069  /**
070   * Constructor. This KeyValueHeap will handle closing of passed in KeyValueScanners.
071   */
072  public KeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
073    throws IOException {
074    this(scanners, new KVScannerComparator(comparator));
075  }
076
077  /**
078   * Constructor.
079   */
080  KeyValueHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
081    throws IOException {
082    this.comparator = comparator;
083    this.scannersForDelayedClose = new ArrayList<>(scanners.size());
084    if (!scanners.isEmpty()) {
085      this.heap = new PriorityQueue<>(scanners.size(), this.comparator);
086      for (KeyValueScanner scanner : scanners) {
087        if (scanner.peek() != null) {
088          this.heap.add(scanner);
089        } else {
090          this.scannersForDelayedClose.add(scanner);
091        }
092      }
093      this.current = pollRealKV();
094    }
095  }
096
097  @Override
098  public ExtendedCell peek() {
099    if (this.current == null) {
100      return null;
101    }
102    return this.current.peek();
103  }
104
105  boolean isLatestCellFromMemstore() {
106    return !this.current.isFileScanner();
107  }
108
109  @Override
110  public void recordBlockSize(IntConsumer blockSizeConsumer) {
111    this.current.recordBlockSize(blockSizeConsumer);
112  }
113
114  @Override
115  public ExtendedCell next() throws IOException {
116    if (this.current == null) {
117      return null;
118    }
119    ExtendedCell kvReturn = this.current.next();
120    ExtendedCell kvNext = this.current.peek();
121    if (kvNext == null) {
122      this.scannersForDelayedClose.add(this.current);
123      this.current = null;
124      this.current = pollRealKV();
125    } else {
126      KeyValueScanner topScanner = this.heap.peek();
127      // no need to add current back to the heap if it is the only scanner left
128      if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
129        this.heap.add(this.current);
130        this.current = null;
131        this.current = pollRealKV();
132      }
133    }
134    return kvReturn;
135  }
136
137  /**
138   * Gets the next row of keys from the top-most scanner.
139   * <p>
140   * This method takes care of updating the heap.
141   * <p>
142   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
143   * KeyValueScanner (a {@link StoreScanner}).
144   * @return true if more rows exist after this one, false if scanner is done
145   */
146  @Override
147  public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
148    throws IOException {
149    if (this.current == null) {
150      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
151    }
152    InternalScanner currentAsInternal = (InternalScanner) this.current;
153    boolean moreCells = currentAsInternal.next(result, scannerContext);
154    Cell pee = this.current.peek();
155
156    /*
157     * By definition, any InternalScanner must return false only when it has no further rows to be
158     * fetched. So, we can close a scanner if it returns false. All existing implementations seem to
159     * be fine with this. It is much more efficient to close scanners which are not needed than keep
160     * them in the heap. This is also required for certain optimizations.
161     */
162
163    if (pee == null || !moreCells) {
164      // add the scanner that is to be closed
165      this.scannersForDelayedClose.add(this.current);
166    } else {
167      this.heap.add(this.current);
168    }
169    this.current = null;
170    this.current = pollRealKV();
171    if (this.current == null) {
172      moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
173    }
174    return moreCells;
175  }
176
177  protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
178    protected CellComparator kvComparator;
179
180    /**
181     * Constructor
182     */
183    public KVScannerComparator(CellComparator kvComparator) {
184      this.kvComparator = kvComparator;
185    }
186
187    @Override
188    public int compare(KeyValueScanner left, KeyValueScanner right) {
189      int comparison = compare(left.peek(), right.peek());
190      if (comparison != 0) {
191        return comparison;
192      } else {
193        // Since both the keys are exactly the same, we break the tie in favor of higher ordered
194        // scanner since it'll have newer data. Since higher value should come first, we reverse
195        // sort here.
196        return Long.compare(right.getScannerOrder(), left.getScannerOrder());
197      }
198    }
199
200    /**
201     * Compares two KeyValue
202     * @return less than 0 if left is smaller, 0 if equal etc..
203     */
204    public int compare(Cell left, Cell right) {
205      return this.kvComparator.compare(left, right);
206    }
207
208    /**
209     *     */
210    public CellComparator getComparator() {
211      return this.kvComparator;
212    }
213  }
214
215  @Override
216  public void close() {
217    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
218      scanner.close();
219    }
220    this.scannersForDelayedClose.clear();
221    if (this.current != null) {
222      this.current.close();
223    }
224    if (this.heap != null) {
225      // Order of closing the scanners shouldn't matter here, so simply iterate and close them.
226      for (KeyValueScanner scanner : heap) {
227        scanner.close();
228      }
229    }
230  }
231
232  /**
233   * Seeks all scanners at or below the specified seek key. If we earlied-out of a row, we may end
234   * up skipping values that were never reached yet. Rather than iterating down, we want to give the
235   * opportunity to re-seek.
236   * <p>
237   * As individual scanners may run past their ends, those scanners are automatically closed and
238   * removed from the heap.
239   * <p>
240   * This function (and {@link #reseek(ExtendedCell)}) does not do multi-column Bloom filter and
241   * lazy-seek optimizations. To enable those, call
242   * {@link #requestSeek(ExtendedCell, boolean, boolean)}.
243   * @param seekKey KeyValue to seek at or after
244   * @return true if KeyValues exist at or after specified key, false if not
245   */
246  @Override
247  public boolean seek(ExtendedCell seekKey) throws IOException {
248    return generalizedSeek(false, // This is not a lazy seek
249      seekKey, false, // forward (false: this is not a reseek)
250      false); // Not using Bloom filters
251  }
252
253  /**
254   * This function is identical to the {@link #seek(ExtendedCell)} function except that
255   * scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
256   */
257  @Override
258  public boolean reseek(ExtendedCell seekKey) throws IOException {
259    return generalizedSeek(false, // This is not a lazy seek
260      seekKey, true, // forward (true because this is reseek)
261      false); // Not using Bloom filters
262  }
263
264  /**
265   * {@inheritDoc}
266   */
267  @Override
268  public boolean requestSeek(ExtendedCell key, boolean forward, boolean useBloom)
269    throws IOException {
270    return generalizedSeek(true, key, forward, useBloom);
271  }
272
273  /**
274   * @param isLazy   whether we are trying to seek to exactly the given row/col. Enables Bloom
275   *                 filter and most-recent-file-first optimizations for multi-column get/scan
276   *                 queries.
277   * @param seekKey  key to seek to
278   * @param forward  whether to seek forward (also known as reseek)
279   * @param useBloom whether to optimize seeks using Bloom filters
280   */
281  private boolean generalizedSeek(boolean isLazy, ExtendedCell seekKey, boolean forward,
282    boolean useBloom) throws IOException {
283    if (!isLazy && useBloom) {
284      throw new IllegalArgumentException(
285        "Multi-column Bloom filter " + "optimization requires a lazy seek");
286    }
287
288    if (current == null) {
289      return false;
290    }
291
292    KeyValueScanner scanner = current;
293    try {
294      while (scanner != null) {
295        Cell topKey = scanner.peek();
296        if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
297          // Top KeyValue is at-or-after Seek KeyValue. We only know that all
298          // scanners are at or after seekKey (because fake keys of
299          // scanners where a lazy-seek operation has been done are not greater
300          // than their real next keys) but we still need to enforce our
301          // invariant that the top scanner has done a real seek. This way
302          // StoreScanner and RegionScanner do not have to worry about fake
303          // keys.
304          heap.add(scanner);
305          scanner = null;
306          current = pollRealKV();
307          return current != null;
308        }
309
310        boolean seekResult;
311        if (isLazy && heap.size() > 0) {
312          // If there is only one scanner left, we don't do lazy seek.
313          seekResult = scanner.requestSeek(seekKey, forward, useBloom);
314        } else {
315          seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, forward);
316        }
317
318        if (!seekResult) {
319          this.scannersForDelayedClose.add(scanner);
320        } else {
321          heap.add(scanner);
322        }
323        scanner = heap.poll();
324        if (scanner == null) {
325          current = null;
326        }
327      }
328    } catch (Exception e) {
329      if (scanner != null) {
330        try {
331          scanner.close();
332        } catch (Exception ce) {
333          LOG.warn("close KeyValueScanner error", ce);
334        }
335      }
336      throw e;
337    }
338
339    // Heap is returning empty, scanner is done
340    return false;
341  }
342
343  /**
344   * Fetches the top sub-scanner from the priority queue, ensuring that a real seek has been done on
345   * it. Works by fetching the top sub-scanner, and if it has not done a real seek, making it do so
346   * (which will modify its top KV), putting it back, and repeating this until success. Relies on
347   * the fact that on a lazy seek we set the current key of a StoreFileScanner to a KV that is not
348   * greater than the real next KV to be read from that file, so the scanner that bubbles up to the
349   * top of the heap will have global next KV in this scanner heap if (1) it has done a real seek
350   * and (2) its KV is the top among all top KVs (some of which are fake) in the scanner heap.
351   */
352  protected KeyValueScanner pollRealKV() throws IOException {
353    KeyValueScanner kvScanner = heap.poll();
354    if (kvScanner == null) {
355      return null;
356    }
357
358    while (kvScanner != null && !kvScanner.realSeekDone()) {
359      if (kvScanner.peek() != null) {
360        try {
361          kvScanner.enforceSeek();
362        } catch (IOException ioe) {
363          // Add the item to delayed close set in case it is leak from close
364          this.scannersForDelayedClose.add(kvScanner);
365          throw ioe;
366        }
367        Cell curKV = kvScanner.peek();
368        if (curKV != null) {
369          KeyValueScanner nextEarliestScanner = heap.peek();
370          if (nextEarliestScanner == null) {
371            // The heap is empty. Return the only possible scanner.
372            return kvScanner;
373          }
374
375          // Compare the current scanner to the next scanner. We try to avoid
376          // putting the current one back into the heap if possible.
377          Cell nextKV = nextEarliestScanner.peek();
378          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
379            // We already have the scanner with the earliest KV, so return it.
380            return kvScanner;
381          }
382
383          // Otherwise, put the scanner back into the heap and let it compete
384          // against all other scanners (both those that have done a "real
385          // seek" and a "lazy seek").
386          heap.add(kvScanner);
387        } else {
388          // Close the scanner because we did a real seek and found out there
389          // are no more KVs.
390          this.scannersForDelayedClose.add(kvScanner);
391        }
392      } else {
393        // Close the scanner because it has already run out of KVs even before
394        // we had to do a real seek on it.
395        this.scannersForDelayedClose.add(kvScanner);
396      }
397      kvScanner = heap.poll();
398    }
399
400    return kvScanner;
401  }
402
403  /** Returns the current Heap */
404  public PriorityQueue<KeyValueScanner> getHeap() {
405    return this.heap;
406  }
407
408  KeyValueScanner getCurrentForTesting() {
409    return current;
410  }
411
412  @Override
413  public ExtendedCell getNextIndexedKey() {
414    // here we return the next index key from the top scanner
415    return current == null ? null : current.getNextIndexedKey();
416  }
417
418  @Override
419  public void shipped() throws IOException {
420    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
421      scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
422    }
423    this.scannersForDelayedClose.clear();
424    if (this.current != null) {
425      this.current.shipped();
426    }
427    if (this.heap != null) {
428      for (KeyValueScanner scanner : this.heap) {
429        scanner.shipped();
430      }
431    }
432  }
433}