001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.querymatcher; 019 020import java.io.IOException; 021import java.util.NavigableSet; 022import org.apache.hadoop.hbase.Cell; 023import org.apache.hadoop.hbase.CellUtil; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.PrivateCellUtil; 026import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; 027import org.apache.yetus.audience.InterfaceAudience; 028 029/** 030 * This class is used for the tracking and enforcement of columns and numbers of versions during the 031 * course of a Get or Scan operation, when explicit column qualifiers have been asked for in the 032 * query. With a little magic (see {@link ScanQueryMatcher}), we can use this matcher for both scans 033 * and gets. The main difference is 'next' and 'done' collapse for the scan case (since we see all 034 * columns in order), and we only reset between rows. 035 * <p> 036 * This class is utilized by {@link ScanQueryMatcher} mainly through two methods: 037 * <ul> 038 * <li>{@link #checkColumn} is called when a Put satisfies all other conditions of the query.</li> 039 * <li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher believes that the current 040 * column should be skipped (by timestamp, filter etc.)</li> 041 * </ul> 042 * <p> 043 * These two methods returns a 044 * {@link org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode} to define 045 * what action should be taken. 046 * <p> 047 * This class is NOT thread-safe as queries are never multi-threaded 048 */ 049@InterfaceAudience.Private 050public class ExplicitColumnTracker implements ColumnTracker { 051 052 private final int maxVersions; 053 private final int minVersions; 054 055 /** 056 * Contains the list of columns that the ExplicitColumnTracker is tracking. Each ColumnCount 057 * instance also tracks how many versions of the requested column have been returned. 058 */ 059 private final ColumnCount[] columns; 060 private int index; 061 private ColumnCount column; 062 /** 063 * Keeps track of the latest timestamp included for current column. Used to eliminate duplicates. 064 */ 065 private long latestTSOfCurrentColumn; 066 private long oldestStamp; 067 068 /** 069 * Default constructor. 070 * @param columns columns specified user in query 071 * @param minVersions minimum number of versions to keep 072 * @param maxVersions maximum versions to return per column 073 * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL 074 */ 075 public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions, int maxVersions, 076 long oldestUnexpiredTS) { 077 this.maxVersions = maxVersions; 078 this.minVersions = minVersions; 079 this.oldestStamp = oldestUnexpiredTS; 080 this.columns = new ColumnCount[columns.size()]; 081 int i = 0; 082 for (byte[] column : columns) { 083 this.columns[i++] = new ColumnCount(column); 084 } 085 reset(); 086 } 087 088 /** 089 * Done when there are no more columns to match against. 090 */ 091 @Override 092 public boolean done() { 093 return this.index >= columns.length; 094 } 095 096 @Override 097 public ColumnCount getColumnHint() { 098 return this.column; 099 } 100 101 /** 102 * {@inheritDoc} 103 */ 104 @Override 105 public ScanQueryMatcher.MatchCode checkColumn(Cell cell, byte type) { 106 // delete markers should never be passed to an 107 // *Explicit*ColumnTracker 108 assert !PrivateCellUtil.isDelete(type); 109 do { 110 // No more columns left, we are done with this query 111 if (done()) { 112 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row 113 } 114 115 // No more columns to match against, done with storefile 116 if (this.column == null) { 117 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row 118 } 119 120 // Compare specific column to current column 121 int ret = CellUtil.compareQualifiers(cell, column.getBuffer(), column.getOffset(), 122 column.getLength()); 123 124 // Column Matches. Return include code. The caller would call checkVersions 125 // to limit the number of versions. 126 if (ret == 0) { 127 return ScanQueryMatcher.MatchCode.INCLUDE; 128 } 129 130 resetTS(); 131 132 if (ret < 0) { 133 // The current KV is smaller than the column the ExplicitColumnTracker 134 // is interested in, so seek to that column of interest. 135 return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; 136 } 137 138 // The current KV is bigger than the column the ExplicitColumnTracker 139 // is interested in. That means there is no more data for the column 140 // of interest. Advance the ExplicitColumnTracker state to next 141 // column of interest, and check again. 142 if (ret > 0) { 143 ++this.index; 144 if (done()) { 145 // No more to match, do not include, done with this row. 146 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row 147 } 148 // This is the recursive case. 149 this.column = this.columns[this.index]; 150 } 151 } while (true); 152 } 153 154 @Override 155 public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type, 156 boolean ignoreCount) throws IOException { 157 assert !PrivateCellUtil.isDelete(type); 158 if (ignoreCount) { 159 return ScanQueryMatcher.MatchCode.INCLUDE; 160 } 161 // Check if it is a duplicate timestamp 162 if (sameAsPreviousTS(timestamp)) { 163 // If duplicate, skip this Key 164 return ScanQueryMatcher.MatchCode.SKIP; 165 } 166 int count = this.column.increment(); 167 if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { 168 // Done with versions for this column 169 ++this.index; 170 resetTS(); 171 if (done()) { 172 // We have served all the requested columns. 173 this.column = null; 174 return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; 175 } 176 // We are done with current column; advance to next column 177 // of interest. 178 this.column = this.columns[this.index]; 179 return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL; 180 } 181 setTS(timestamp); 182 return ScanQueryMatcher.MatchCode.INCLUDE; 183 } 184 185 // Called between every row. 186 @Override 187 public void reset() { 188 this.index = 0; 189 this.column = this.columns[this.index]; 190 for (ColumnCount col : this.columns) { 191 col.setCount(0); 192 } 193 resetTS(); 194 } 195 196 private void resetTS() { 197 latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP; 198 } 199 200 private void setTS(long timestamp) { 201 latestTSOfCurrentColumn = timestamp; 202 } 203 204 private boolean sameAsPreviousTS(long timestamp) { 205 return timestamp == latestTSOfCurrentColumn; 206 } 207 208 private boolean isExpired(long timestamp) { 209 return timestamp < oldestStamp; 210 } 211 212 @Override 213 public void doneWithColumn(Cell cell) { 214 while (this.column != null) { 215 int compare = CellUtil.compareQualifiers(cell, column.getBuffer(), column.getOffset(), 216 column.getLength()); 217 resetTS(); 218 if (compare >= 0) { 219 ++this.index; 220 if (done()) { 221 // Will not hit any more columns in this storefile 222 this.column = null; 223 } else { 224 this.column = this.columns[this.index]; 225 } 226 if (compare > 0) { 227 continue; 228 } 229 } 230 return; 231 } 232 } 233 234 @Override 235 public MatchCode getNextRowOrNextColumn(Cell cell) { 236 doneWithColumn(cell); 237 238 if (getColumnHint() == null) { 239 return MatchCode.SEEK_NEXT_ROW; 240 } else { 241 return MatchCode.SEEK_NEXT_COL; 242 } 243 } 244 245 @Override 246 public boolean isDone(long timestamp) { 247 return minVersions <= 0 && isExpired(timestamp); 248 } 249 250 @Override 251 public void beforeShipped() throws IOException { 252 // do nothing 253 } 254}