001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.commons.lang3.NotImplementedException;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.CellComparator;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.ExtendedCell;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * ReversedKeyValueHeap is used for supporting reversed scanning. Compared with KeyValueHeap, its
032 * scanner comparator is a little different (see ReversedKVScannerComparator), all seek is backward
033 * seek(see {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row if it is
034 * already at the end of one row when calling next().
035 */
036@InterfaceAudience.Private
037public class ReversedKeyValueHeap extends KeyValueHeap {
038
039  /**
040   *   */
041  public ReversedKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
042    throws IOException {
043    super(scanners, new ReversedKVScannerComparator(comparator));
044  }
045
046  @Override
047  public boolean seek(ExtendedCell seekKey) throws IOException {
048    throw new IllegalStateException("seek cannot be called on ReversedKeyValueHeap");
049  }
050
051  @Override
052  public boolean reseek(ExtendedCell seekKey) throws IOException {
053    throw new IllegalStateException("reseek cannot be called on ReversedKeyValueHeap");
054  }
055
056  @Override
057  public boolean requestSeek(ExtendedCell key, boolean forward, boolean useBloom)
058    throws IOException {
059    throw new IllegalStateException("requestSeek cannot be called on ReversedKeyValueHeap");
060  }
061
062  @Override
063  public boolean seekToPreviousRow(ExtendedCell seekKey) throws IOException {
064    if (current == null) {
065      return false;
066    }
067    heap.add(current);
068    current = null;
069
070    KeyValueScanner scanner;
071    while ((scanner = heap.poll()) != null) {
072      Cell topKey = scanner.peek();
073      if (comparator.getComparator().compareRows(topKey, seekKey) < 0) {
074        // Row of Top KeyValue is before Seek row.
075        heap.add(scanner);
076        current = pollRealKV();
077        return current != null;
078      }
079
080      if (!scanner.seekToPreviousRow(seekKey)) {
081        this.scannersForDelayedClose.add(scanner);
082      } else {
083        heap.add(scanner);
084      }
085    }
086
087    // Heap is returning empty, scanner is done
088    return false;
089  }
090
091  @Override
092  public boolean backwardSeek(ExtendedCell seekKey) throws IOException {
093    if (current == null) {
094      return false;
095    }
096    heap.add(current);
097    current = null;
098
099    KeyValueScanner scanner;
100    while ((scanner = heap.poll()) != null) {
101      Cell topKey = scanner.peek();
102      if (
103        (CellUtil.matchingRows(seekKey, topKey)
104          && comparator.getComparator().compare(seekKey, topKey) <= 0)
105          || comparator.getComparator().compareRows(seekKey, topKey) > 0
106      ) {
107        heap.add(scanner);
108        current = pollRealKV();
109        return current != null;
110      }
111      if (!scanner.backwardSeek(seekKey)) {
112        this.scannersForDelayedClose.add(scanner);
113      } else {
114        heap.add(scanner);
115      }
116    }
117    return false;
118  }
119
120  @Override
121  public ExtendedCell next() throws IOException {
122    if (this.current == null) {
123      return null;
124    }
125    ExtendedCell kvReturn = this.current.next();
126    ExtendedCell kvNext = this.current.peek();
127    if (kvNext == null || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) {
128      if (this.current.seekToPreviousRow(kvReturn)) {
129        this.heap.add(this.current);
130      } else {
131        this.scannersForDelayedClose.add(this.current);
132      }
133      this.current = null;
134      this.current = pollRealKV();
135    } else {
136      KeyValueScanner topScanner = this.heap.peek();
137      if (topScanner != null && this.comparator.compare(this.current, topScanner) > 0) {
138        this.heap.add(this.current);
139        this.current = null;
140        this.current = pollRealKV();
141      }
142    }
143    return kvReturn;
144  }
145
146  /**
147   * In ReversedKVScannerComparator, we compare the row of scanners' peek values first, sort bigger
148   * one before the smaller one. Then compare the KeyValue if they have the equal row, sort smaller
149   * one before the bigger one
150   */
151  private static class ReversedKVScannerComparator extends KVScannerComparator {
152
153    /**
154     * Constructor
155     */
156    public ReversedKVScannerComparator(CellComparator kvComparator) {
157      super(kvComparator);
158    }
159
160    @Override
161    public int compare(KeyValueScanner left, KeyValueScanner right) {
162      int rowComparison = compareRows(left.peek(), right.peek());
163      if (rowComparison != 0) {
164        return -rowComparison;
165      }
166      return super.compare(left, right);
167    }
168
169    /**
170     * Compares rows of two KeyValue
171     * @return less than 0 if left is smaller, 0 if equal etc..
172     */
173    public int compareRows(Cell left, Cell right) {
174      return super.kvComparator.compareRows(left, right);
175    }
176  }
177
178  @Override
179  public boolean seekToLastRow() throws IOException {
180    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
181  }
182}