001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Collection;
024import java.util.List;
025import java.util.Optional;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.function.Predicate;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Durability;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
049import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
050import org.apache.hadoop.hbase.regionserver.InternalScanner;
051import org.apache.hadoop.hbase.regionserver.Region;
052import org.apache.hadoop.hbase.regionserver.ScanType;
053import org.apache.hadoop.hbase.regionserver.ScannerContext;
054import org.apache.hadoop.hbase.regionserver.Store;
055import org.apache.hadoop.hbase.regionserver.StoreScanner;
056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.testclassification.MiscTests;
060import org.apache.hadoop.hbase.wal.WALEdit;
061import org.junit.AfterClass;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.runner.RunWith;
067import org.junit.runners.Parameterized;
068import org.junit.runners.Parameterized.Parameters;
069
070@Category({ MiscTests.class, MediumTests.class })
071@RunWith(Parameterized.class)
072public class TestCoprocessorScanPolicy {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestCoprocessorScanPolicy.class);
077
078  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079  private static final byte[] F = Bytes.toBytes("fam");
080  private static final byte[] Q = Bytes.toBytes("qual");
081  private static final byte[] R = Bytes.toBytes("row");
082
083  @BeforeClass
084  public static void setUpBeforeClass() throws Exception {
085    Configuration conf = TEST_UTIL.getConfiguration();
086    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName());
087    TEST_UTIL.startMiniCluster();
088  }
089
090  @AfterClass
091  public static void tearDownAfterClass() throws Exception {
092    TEST_UTIL.shutdownMiniCluster();
093  }
094
095  @Parameters
096  public static Collection<Object[]> parameters() {
097    return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED;
098  }
099
100  public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
101    TEST_UTIL.getMiniHBaseCluster().getConf()
102      .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
103  }
104
105  @Test
106  public void testBaseCases() throws Exception {
107    TableName tableName = TableName.valueOf("baseCases");
108    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
109      TEST_UTIL.deleteTable(tableName);
110    }
111    Table t = TEST_UTIL.createTable(tableName, F, 10);
112    // insert 3 versions
113    long now = EnvironmentEdgeManager.currentTime();
114    Put p = new Put(R);
115    p.addColumn(F, Q, now, Q);
116    t.put(p);
117    p = new Put(R);
118    p.addColumn(F, Q, now + 1, Q);
119    t.put(p);
120    p = new Put(R);
121    p.addColumn(F, Q, now + 2, Q);
122    t.put(p);
123
124    Get g = new Get(R);
125    g.readVersions(10);
126    Result r = t.get(g);
127    assertEquals(3, r.size());
128
129    TEST_UTIL.flush(tableName);
130    TEST_UTIL.compact(tableName, true);
131
132    // still visible after a flush/compaction
133    r = t.get(g);
134    assertEquals(3, r.size());
135
136    // set the version override to 2
137    p = new Put(R);
138    p.setAttribute("versions", new byte[] {});
139    p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
140    t.put(p);
141
142    // only 2 versions now
143    r = t.get(g);
144    assertEquals(2, r.size());
145
146    TEST_UTIL.flush(tableName);
147    TEST_UTIL.compact(tableName, true);
148
149    // still 2 versions after a flush/compaction
150    r = t.get(g);
151    assertEquals(2, r.size());
152
153    // insert a new version
154    p.addColumn(F, Q, now + 3, Q);
155    t.put(p);
156
157    // still 2 versions
158    r = t.get(g);
159    assertEquals(2, r.size());
160
161    t.close();
162  }
163
164  @Test
165  public void testTTL() throws Exception {
166    TableName tableName = TableName.valueOf("testTTL");
167    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
168      TEST_UTIL.deleteTable(tableName);
169    }
170    Table t = TEST_UTIL.createTable(tableName, F, 10);
171    long now = EnvironmentEdgeManager.currentTime();
172    ManualEnvironmentEdge me = new ManualEnvironmentEdge();
173    me.setValue(now);
174    EnvironmentEdgeManagerTestHelper.injectEdge(me);
175    // 2s in the past
176    long ts = now - 2000;
177
178    Put p = new Put(R);
179    p.addColumn(F, Q, ts, Q);
180    t.put(p);
181    p = new Put(R);
182    p.addColumn(F, Q, ts + 1, Q);
183    t.put(p);
184
185    // Set the TTL override to 3s
186    p = new Put(R);
187    p.setAttribute("ttl", new byte[] {});
188    p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
189    t.put(p);
190    // these two should still be there
191    Get g = new Get(R);
192    g.readAllVersions();
193    Result r = t.get(g);
194    // still there?
195    assertEquals(2, r.size());
196
197    TEST_UTIL.flush(tableName);
198    TEST_UTIL.compact(tableName, true);
199
200    g = new Get(R);
201    g.readAllVersions();
202    r = t.get(g);
203    // still there?
204    assertEquals(2, r.size());
205
206    // roll time forward 2s.
207    me.setValue(now + 2000);
208    // now verify that data eventually does expire
209    g = new Get(R);
210    g.readAllVersions();
211    r = t.get(g);
212    // should be gone now
213    assertEquals(0, r.size());
214    t.close();
215    EnvironmentEdgeManager.reset();
216  }
217
218  public static class ScanObserver implements RegionCoprocessor, RegionObserver {
219    private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>();
220    private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>();
221
222    @Override
223    public Optional<RegionObserver> getRegionObserver() {
224      return Optional.of(this);
225    }
226
227    // lame way to communicate with the coprocessor,
228    // since it is loaded by a different class loader
229    @Override
230    public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
231      final Put put, final WALEdit edit, final Durability durability) throws IOException {
232      if (put.getAttribute("ttl") != null) {
233        Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
234        ttls.put(
235          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
236            cell.getQualifierLength())),
237          Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
238        c.bypass();
239      } else if (put.getAttribute("versions") != null) {
240        Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
241        versions.put(
242          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
243            cell.getQualifierLength())),
244          Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
245        c.bypass();
246      }
247    }
248
249    private InternalScanner wrap(Store store, InternalScanner scanner) {
250      Long ttl = this.ttls.get(store.getTableName());
251      Integer version = this.versions.get(store.getTableName());
252      return new DelegatingInternalScanner(scanner) {
253
254        private byte[] row;
255
256        private byte[] qualifier;
257
258        private int count;
259
260        private Predicate<Cell> checkTtl(long now, long ttl) {
261          return c -> now - c.getTimestamp() > ttl;
262        }
263
264        private Predicate<Cell> checkVersion(Cell firstCell, int version) {
265          if (version == 0) {
266            return c -> true;
267          } else {
268            if (row == null || !CellUtil.matchingRows(firstCell, row)) {
269              row = CellUtil.cloneRow(firstCell);
270              // reset qualifier as there is a row change
271              qualifier = null;
272            }
273            return c -> {
274              if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) {
275                if (count >= version) {
276                  return true;
277                }
278                count++;
279                return false;
280              } else { // qualifier switch
281                qualifier = CellUtil.cloneQualifier(c);
282                count = 1;
283                return false;
284              }
285            };
286          }
287        }
288
289        @Override
290        public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
291          throws IOException {
292          boolean moreRows = scanner.next(result, scannerContext);
293          if (result.isEmpty()) {
294            return moreRows;
295          }
296          long now = EnvironmentEdgeManager.currentTime();
297          Predicate<Cell> predicate = null;
298          if (ttl != null) {
299            predicate = checkTtl(now, ttl);
300          }
301          if (version != null) {
302            Predicate<Cell> vp = checkVersion((Cell) result.get(0), version);
303            if (predicate != null) {
304              predicate = predicate.and(vp);
305            } else {
306              predicate = vp;
307            }
308          }
309          if (predicate != null) {
310            ((List<Cell>) result).removeIf(predicate);
311          }
312          return moreRows;
313        }
314      };
315    }
316
317    @Override
318    public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
319      Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
320      return wrap(store, scanner);
321    }
322
323    @Override
324    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
325      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
326      CompactionRequest request) throws IOException {
327      return wrap(store, scanner);
328    }
329
330    @Override
331    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
332      List<Cell> result) throws IOException {
333      TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName();
334      Long ttl = this.ttls.get(tableName);
335      if (ttl != null) {
336        get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax());
337      }
338      Integer version = this.versions.get(tableName);
339      if (version != null) {
340        get.readVersions(version);
341      }
342    }
343
344    @Override
345    public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, Scan scan)
346      throws IOException {
347      Region region = c.getEnvironment().getRegion();
348      TableName tableName = region.getTableDescriptor().getTableName();
349      Long ttl = this.ttls.get(tableName);
350      if (ttl != null) {
351        scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax());
352      }
353      Integer version = this.versions.get(tableName);
354      if (version != null) {
355        scan.readVersions(version);
356      }
357    }
358  }
359}