001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.querymatcher;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import java.util.NavigableMap;
024import java.util.NavigableSet;
025import java.util.SortedMap;
026import java.util.SortedSet;
027import java.util.TreeMap;
028import java.util.TreeSet;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.KeyValue.Type;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
035import org.apache.yetus.audience.InterfaceAudience;
036
037/**
038 * A tracker both implementing ColumnTracker and DeleteTracker, used for mvcc-sensitive scanning. We
039 * should make sure in one QueryMatcher the ColumnTracker and DeleteTracker is the same instance.
040 */
041@InterfaceAudience.Private
042public class NewVersionBehaviorTracker implements ColumnTracker, DeleteTracker {
043
044  private byte[] lastCqArray;
045  private int lastCqLength;
046  private int lastCqOffset;
047  private long lastCqTs;
048  private long lastCqMvcc;
049  private byte lastCqType;
050  private int columnIndex;
051  private int countCurrentCol;
052
053  protected int maxVersions;
054  private int resultMaxVersions;
055  private byte[][] columns;
056  private int minVersions;
057  private long oldestStamp;
058  private CellComparator comparator;
059
060  // These two maps have same structure.
061  // Each node is a versions deletion (DeleteFamily or DeleteColumn). Key is the mvcc of the marker,
062  // value is a data structure which contains infos we need that happens before this node's mvcc and
063  // after the previous node's mvcc. The last node is a special node whose key is max_long that
064  // saves infos after last deletion. See DeleteVersionsNode's comments for details.
065  // The delColMap is constructed and used for each cq, and thedelFamMap is constructed when cq is
066  // null and saving family-level delete markers. Each time the cq is changed, we should
067  // reconstruct delColMap as a deep copy of delFamMap.
068  protected NavigableMap<Long, DeleteVersionsNode> delColMap = new TreeMap<>();
069  protected NavigableMap<Long, DeleteVersionsNode> delFamMap = new TreeMap<>();
070
071  /**
072   * Note maxVersion and minVersion must set according to cf's conf, not user's scan parameter.
073   * @param columns           columns specified user in query
074   * @param comparartor       the cell comparator
075   * @param minVersion        The minimum number of versions to keep(used when TTL is set).
076   * @param maxVersion        The maximum number of versions in CF's conf
077   * @param resultMaxVersions maximum versions to return per column, which may be different from
078   *                          maxVersion
079   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
080   */
081  public NewVersionBehaviorTracker(NavigableSet<byte[]> columns, CellComparator comparartor,
082    int minVersion, int maxVersion, int resultMaxVersions, long oldestUnexpiredTS) {
083    this.maxVersions = maxVersion;
084    this.minVersions = minVersion;
085    this.resultMaxVersions = resultMaxVersions;
086    this.oldestStamp = oldestUnexpiredTS;
087    if (columns != null && columns.size() > 0) {
088      this.columns = new byte[columns.size()][];
089      int i = 0;
090      for (byte[] column : columns) {
091        this.columns[i++] = column;
092      }
093    }
094    this.comparator = comparartor;
095    reset();
096  }
097
098  @Override
099  public void beforeShipped() throws IOException {
100    // Do nothing
101  }
102
103  /**
104   * A data structure which contains infos we need that happens before this node's mvcc and after
105   * the previous node's mvcc. A node means there is a version deletion at the mvcc and ts.
106   */
107  protected class DeleteVersionsNode {
108    public long ts;
109    public long mvcc;
110
111    // <timestamp, set<mvcc>>
112    // Key is ts of version deletes, value is its mvccs.
113    // We may delete more than one time for a version.
114    private Map<Long, SortedSet<Long>> deletesMap = new HashMap<>();
115
116    // <mvcc, set<mvcc>>
117    // Key is mvcc of version deletes, value is mvcc of visible puts before the delete effect.
118    private NavigableMap<Long, SortedSet<Long>> mvccCountingMap = new TreeMap<>();
119
120    protected DeleteVersionsNode(long ts, long mvcc) {
121      this.ts = ts;
122      this.mvcc = mvcc;
123      mvccCountingMap.put(Long.MAX_VALUE, new TreeSet<Long>());
124    }
125
126    protected DeleteVersionsNode() {
127      this(Long.MIN_VALUE, Long.MAX_VALUE);
128    }
129
130    public void addVersionDelete(Cell cell) {
131      SortedSet<Long> set = deletesMap.get(cell.getTimestamp());
132      if (set == null) {
133        set = new TreeSet<>();
134        deletesMap.put(cell.getTimestamp(), set);
135      }
136      set.add(cell.getSequenceId());
137      // The init set should be the puts whose mvcc is smaller than this Delete. Because
138      // there may be some Puts masked by them. The Puts whose mvcc is larger than this Delete can
139      // not be copied to this node because we may delete one version and the oldest put may not be
140      // masked.
141      SortedSet<Long> nextValue = mvccCountingMap.ceilingEntry(cell.getSequenceId()).getValue();
142      SortedSet<Long> thisValue = new TreeSet<>(nextValue.headSet(cell.getSequenceId()));
143      mvccCountingMap.put(cell.getSequenceId(), thisValue);
144    }
145
146    protected DeleteVersionsNode getDeepCopy() {
147      DeleteVersionsNode node = new DeleteVersionsNode(ts, mvcc);
148      for (Map.Entry<Long, SortedSet<Long>> e : deletesMap.entrySet()) {
149        node.deletesMap.put(e.getKey(), new TreeSet<>(e.getValue()));
150      }
151      for (Map.Entry<Long, SortedSet<Long>> e : mvccCountingMap.entrySet()) {
152        node.mvccCountingMap.put(e.getKey(), new TreeSet<>(e.getValue()));
153      }
154      return node;
155    }
156  }
157
158  /**
159   * Reset the map if it is different with the last Cell. Save the cq array/offset/length for next
160   * Cell.
161   * @return If this put has duplicate ts with last cell, return the mvcc of last cell. Else return
162   *         MAX_VALUE.
163   */
164  protected long prepare(Cell cell) {
165    if (isColumnQualifierChanged(cell)) {
166      // The last cell is family-level delete and this is not, or the cq is changed,
167      // we should construct delColMap as a deep copy of delFamMap.
168      delColMap.clear();
169      for (Map.Entry<Long, DeleteVersionsNode> e : delFamMap.entrySet()) {
170        delColMap.put(e.getKey(), e.getValue().getDeepCopy());
171      }
172      countCurrentCol = 0;
173    } else if (
174      !PrivateCellUtil.isDelete(lastCqType) && lastCqType == cell.getTypeByte()
175        && lastCqTs == cell.getTimestamp()
176    ) {
177      // Put with duplicate timestamp, ignore.
178      return lastCqMvcc;
179    }
180    lastCqArray = cell.getQualifierArray();
181    lastCqOffset = cell.getQualifierOffset();
182    lastCqLength = cell.getQualifierLength();
183    lastCqTs = cell.getTimestamp();
184    lastCqMvcc = cell.getSequenceId();
185    lastCqType = cell.getTypeByte();
186    return Long.MAX_VALUE;
187  }
188
189  private boolean isColumnQualifierChanged(Cell cell) {
190    if (
191      delColMap.isEmpty() && lastCqArray == null && cell.getQualifierLength() == 0
192        && (PrivateCellUtil.isDeleteColumns(cell) || PrivateCellUtil.isDeleteColumnVersion(cell))
193    ) {
194      // for null columnQualifier
195      return true;
196    }
197    return !PrivateCellUtil.matchingQualifier(cell, lastCqArray, lastCqOffset, lastCqLength);
198  }
199
200  // DeleteTracker
201  @Override
202  public void add(Cell cell) {
203    prepare(cell);
204    byte type = cell.getTypeByte();
205    switch (Type.codeToType(type)) {
206      // By the order of seen. We put null cq at first.
207      case DeleteFamily: // Delete all versions of all columns of the specified family
208        delFamMap.put(cell.getSequenceId(),
209          new DeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId()));
210        break;
211      case DeleteFamilyVersion: // Delete all columns of the specified family and specified version
212        delFamMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell);
213        break;
214
215      // These two kinds of markers are mix with Puts.
216      case DeleteColumn: // Delete all versions of the specified column
217        delColMap.put(cell.getSequenceId(),
218          new DeleteVersionsNode(cell.getTimestamp(), cell.getSequenceId()));
219        break;
220      case Delete: // Delete the specified version of the specified column.
221        delColMap.ceilingEntry(cell.getSequenceId()).getValue().addVersionDelete(cell);
222        break;
223      default:
224        throw new AssertionError("Unknown delete marker type for " + cell);
225    }
226  }
227
228  /**
229   * This method is not idempotent, we will save some info to judge VERSION_MASKED.
230   * @param cell - current cell to check if deleted by a previously seen delete
231   * @return We don't distinguish DeleteColumn and DeleteFamily. We only return code for column.
232   */
233  @Override
234  public DeleteResult isDeleted(Cell cell) {
235    long duplicateMvcc = prepare(cell);
236
237    for (Map.Entry<Long, DeleteVersionsNode> e : delColMap.tailMap(cell.getSequenceId())
238      .entrySet()) {
239      DeleteVersionsNode node = e.getValue();
240      long deleteMvcc = Long.MAX_VALUE;
241      SortedSet<Long> deleteVersionMvccs = node.deletesMap.get(cell.getTimestamp());
242      if (deleteVersionMvccs != null) {
243        SortedSet<Long> tail = deleteVersionMvccs.tailSet(cell.getSequenceId());
244        if (!tail.isEmpty()) {
245          deleteMvcc = tail.first();
246        }
247      }
248      SortedMap<Long, SortedSet<Long>> subMap = node.mvccCountingMap.subMap(cell.getSequenceId(),
249        true, Math.min(duplicateMvcc, deleteMvcc), true);
250      for (Map.Entry<Long, SortedSet<Long>> seg : subMap.entrySet()) {
251        if (seg.getValue().size() >= maxVersions) {
252          return DeleteResult.VERSION_MASKED;
253        }
254        seg.getValue().add(cell.getSequenceId());
255      }
256      if (deleteMvcc < Long.MAX_VALUE) {
257        return DeleteResult.VERSION_DELETED;
258      }
259
260      if (cell.getTimestamp() <= node.ts) {
261        return DeleteResult.COLUMN_DELETED;
262      }
263    }
264    if (duplicateMvcc < Long.MAX_VALUE) {
265      return DeleteResult.VERSION_MASKED;
266    }
267    return DeleteResult.NOT_DELETED;
268  }
269
270  @Override
271  public boolean isEmpty() {
272    return delColMap.size() == 1 && delColMap.get(Long.MAX_VALUE).mvccCountingMap.size() == 1
273      && delFamMap.size() == 1 && delFamMap.get(Long.MAX_VALUE).mvccCountingMap.size() == 1;
274  }
275
276  @Override
277  public void update() {
278    // ignore
279  }
280
281  // ColumnTracker
282
283  @Override
284  public MatchCode checkColumn(Cell cell, byte type) throws IOException {
285    if (columns == null) {
286      return MatchCode.INCLUDE;
287    }
288
289    while (!done()) {
290      int c =
291        CellUtil.compareQualifiers(cell, columns[columnIndex], 0, columns[columnIndex].length);
292      if (c < 0) {
293        return MatchCode.SEEK_NEXT_COL;
294      }
295
296      if (c == 0) {
297        // We drop old version in #isDeleted, so here we must return INCLUDE.
298        return MatchCode.INCLUDE;
299      }
300
301      columnIndex++;
302    }
303    // No more columns left, we are done with this query
304    return MatchCode.SEEK_NEXT_ROW;
305  }
306
307  @Override
308  public MatchCode checkVersions(Cell cell, long timestamp, byte type, boolean ignoreCount)
309    throws IOException {
310    assert !PrivateCellUtil.isDelete(type);
311    // We drop old version in #isDeleted, so here we won't SKIP because of versioning. But we should
312    // consider TTL.
313    if (ignoreCount) {
314      return MatchCode.INCLUDE;
315    }
316    countCurrentCol++;
317    if (timestamp < this.oldestStamp) {
318      if (countCurrentCol == minVersions) {
319        return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
320      }
321      if (countCurrentCol > minVersions) {
322        // This may not be reached, only for safety.
323        return MatchCode.SEEK_NEXT_COL;
324      }
325    }
326
327    if (countCurrentCol == resultMaxVersions) {
328      // We have enough number of versions for user's requirement.
329      return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
330    }
331    if (countCurrentCol > resultMaxVersions) {
332      // This may not be reached, only for safety
333      return MatchCode.SEEK_NEXT_COL;
334    }
335    return MatchCode.INCLUDE;
336  }
337
338  @Override
339  public void reset() {
340    delColMap.clear();
341    delFamMap.clear();
342    lastCqArray = null;
343    lastCqLength = 0;
344    lastCqOffset = 0;
345    lastCqTs = Long.MIN_VALUE;
346    lastCqMvcc = 0;
347    lastCqType = 0;
348    columnIndex = 0;
349    countCurrentCol = 0;
350    resetInternal();
351  }
352
353  protected void resetInternal() {
354    delFamMap.put(Long.MAX_VALUE, new DeleteVersionsNode());
355  }
356
357  @Override
358  public boolean done() {
359    return columns != null && columnIndex >= columns.length;
360  }
361
362  @Override
363  public ColumnCount getColumnHint() {
364    if (columns != null) {
365      if (columnIndex < columns.length) {
366        return new ColumnCount(columns[columnIndex]);
367      }
368    }
369    return null;
370  }
371
372  @Override
373  public MatchCode getNextRowOrNextColumn(Cell cell) {
374    // TODO maybe we can optimize.
375    return MatchCode.SEEK_NEXT_COL;
376  }
377
378  @Override
379  public boolean isDone(long timestamp) {
380    // We can not skip Cells with small ts.
381    return false;
382  }
383
384  @Override
385  public CellComparator getCellComparator() {
386    return this.comparator;
387  }
388
389}