001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.Collection; 024import java.util.List; 025import java.util.Optional; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.function.Predicate; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Durability; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 044import org.apache.hadoop.hbase.coprocessor.ObserverContext; 045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.RegionObserver; 048import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 049import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 050import org.apache.hadoop.hbase.regionserver.InternalScanner; 051import org.apache.hadoop.hbase.regionserver.Region; 052import org.apache.hadoop.hbase.regionserver.ScanType; 053import org.apache.hadoop.hbase.regionserver.ScannerContext; 054import org.apache.hadoop.hbase.regionserver.Store; 055import org.apache.hadoop.hbase.regionserver.StoreScanner; 056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.testclassification.MiscTests; 060import org.apache.hadoop.hbase.wal.WALEdit; 061import org.junit.AfterClass; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.junit.runner.RunWith; 067import org.junit.runners.Parameterized; 068import org.junit.runners.Parameterized.Parameters; 069 070@Category({ MiscTests.class, MediumTests.class }) 071@RunWith(Parameterized.class) 072public class TestCoprocessorScanPolicy { 073 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestCoprocessorScanPolicy.class); 077 078 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 079 private static final byte[] F = Bytes.toBytes("fam"); 080 private static final byte[] Q = Bytes.toBytes("qual"); 081 private static final byte[] R = Bytes.toBytes("row"); 082 083 @BeforeClass 084 public static void setUpBeforeClass() throws Exception { 085 Configuration conf = TEST_UTIL.getConfiguration(); 086 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName()); 087 TEST_UTIL.startMiniCluster(); 088 } 089 090 @AfterClass 091 public static void tearDownAfterClass() throws Exception { 092 TEST_UTIL.shutdownMiniCluster(); 093 } 094 095 @Parameters 096 public static Collection<Object[]> parameters() { 097 return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED; 098 } 099 100 public TestCoprocessorScanPolicy(boolean parallelSeekEnable) { 101 TEST_UTIL.getMiniHBaseCluster().getConf() 102 .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable); 103 } 104 105 @Test 106 public void testBaseCases() throws Exception { 107 TableName tableName = TableName.valueOf("baseCases"); 108 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 109 TEST_UTIL.deleteTable(tableName); 110 } 111 Table t = TEST_UTIL.createTable(tableName, F, 10); 112 // insert 3 versions 113 long now = EnvironmentEdgeManager.currentTime(); 114 Put p = new Put(R); 115 p.addColumn(F, Q, now, Q); 116 t.put(p); 117 p = new Put(R); 118 p.addColumn(F, Q, now + 1, Q); 119 t.put(p); 120 p = new Put(R); 121 p.addColumn(F, Q, now + 2, Q); 122 t.put(p); 123 124 Get g = new Get(R); 125 g.readVersions(10); 126 Result r = t.get(g); 127 assertEquals(3, r.size()); 128 129 TEST_UTIL.flush(tableName); 130 TEST_UTIL.compact(tableName, true); 131 132 // still visible after a flush/compaction 133 r = t.get(g); 134 assertEquals(3, r.size()); 135 136 // set the version override to 2 137 p = new Put(R); 138 p.setAttribute("versions", new byte[] {}); 139 p.addColumn(F, tableName.getName(), Bytes.toBytes(2)); 140 t.put(p); 141 142 // only 2 versions now 143 r = t.get(g); 144 assertEquals(2, r.size()); 145 146 TEST_UTIL.flush(tableName); 147 TEST_UTIL.compact(tableName, true); 148 149 // still 2 versions after a flush/compaction 150 r = t.get(g); 151 assertEquals(2, r.size()); 152 153 // insert a new version 154 p.addColumn(F, Q, now + 3, Q); 155 t.put(p); 156 157 // still 2 versions 158 r = t.get(g); 159 assertEquals(2, r.size()); 160 161 t.close(); 162 } 163 164 @Test 165 public void testTTL() throws Exception { 166 TableName tableName = TableName.valueOf("testTTL"); 167 if (TEST_UTIL.getAdmin().tableExists(tableName)) { 168 TEST_UTIL.deleteTable(tableName); 169 } 170 Table t = TEST_UTIL.createTable(tableName, F, 10); 171 long now = EnvironmentEdgeManager.currentTime(); 172 ManualEnvironmentEdge me = new ManualEnvironmentEdge(); 173 me.setValue(now); 174 EnvironmentEdgeManagerTestHelper.injectEdge(me); 175 // 2s in the past 176 long ts = now - 2000; 177 178 Put p = new Put(R); 179 p.addColumn(F, Q, ts, Q); 180 t.put(p); 181 p = new Put(R); 182 p.addColumn(F, Q, ts + 1, Q); 183 t.put(p); 184 185 // Set the TTL override to 3s 186 p = new Put(R); 187 p.setAttribute("ttl", new byte[] {}); 188 p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L)); 189 t.put(p); 190 // these two should still be there 191 Get g = new Get(R); 192 g.readAllVersions(); 193 Result r = t.get(g); 194 // still there? 195 assertEquals(2, r.size()); 196 197 TEST_UTIL.flush(tableName); 198 TEST_UTIL.compact(tableName, true); 199 200 g = new Get(R); 201 g.readAllVersions(); 202 r = t.get(g); 203 // still there? 204 assertEquals(2, r.size()); 205 206 // roll time forward 2s. 207 me.setValue(now + 2000); 208 // now verify that data eventually does expire 209 g = new Get(R); 210 g.readAllVersions(); 211 r = t.get(g); 212 // should be gone now 213 assertEquals(0, r.size()); 214 t.close(); 215 EnvironmentEdgeManager.reset(); 216 } 217 218 public static class ScanObserver implements RegionCoprocessor, RegionObserver { 219 private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>(); 220 private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>(); 221 222 @Override 223 public Optional<RegionObserver> getRegionObserver() { 224 return Optional.of(this); 225 } 226 227 // lame way to communicate with the coprocessor, 228 // since it is loaded by a different class loader 229 @Override 230 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> c, 231 final Put put, final WALEdit edit, final Durability durability) throws IOException { 232 if (put.getAttribute("ttl") != null) { 233 Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); 234 ttls.put( 235 TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), 236 cell.getQualifierLength())), 237 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 238 c.bypass(); 239 } else if (put.getAttribute("versions") != null) { 240 Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); 241 versions.put( 242 TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), 243 cell.getQualifierLength())), 244 Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 245 c.bypass(); 246 } 247 } 248 249 private InternalScanner wrap(Store store, InternalScanner scanner) { 250 Long ttl = this.ttls.get(store.getTableName()); 251 Integer version = this.versions.get(store.getTableName()); 252 return new DelegatingInternalScanner(scanner) { 253 254 private byte[] row; 255 256 private byte[] qualifier; 257 258 private int count; 259 260 private Predicate<Cell> checkTtl(long now, long ttl) { 261 return c -> now - c.getTimestamp() > ttl; 262 } 263 264 private Predicate<Cell> checkVersion(Cell firstCell, int version) { 265 if (version == 0) { 266 return c -> true; 267 } else { 268 if (row == null || !CellUtil.matchingRows(firstCell, row)) { 269 row = CellUtil.cloneRow(firstCell); 270 // reset qualifier as there is a row change 271 qualifier = null; 272 } 273 return c -> { 274 if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) { 275 if (count >= version) { 276 return true; 277 } 278 count++; 279 return false; 280 } else { // qualifier switch 281 qualifier = CellUtil.cloneQualifier(c); 282 count = 1; 283 return false; 284 } 285 }; 286 } 287 } 288 289 @Override 290 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 291 throws IOException { 292 boolean moreRows = scanner.next(result, scannerContext); 293 if (result.isEmpty()) { 294 return moreRows; 295 } 296 long now = EnvironmentEdgeManager.currentTime(); 297 Predicate<Cell> predicate = null; 298 if (ttl != null) { 299 predicate = checkTtl(now, ttl); 300 } 301 if (version != null) { 302 Predicate<Cell> vp = checkVersion((Cell) result.get(0), version); 303 if (predicate != null) { 304 predicate = predicate.and(vp); 305 } else { 306 predicate = vp; 307 } 308 } 309 if (predicate != null) { 310 ((List<Cell>) result).removeIf(predicate); 311 } 312 return moreRows; 313 } 314 }; 315 } 316 317 @Override 318 public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, 319 Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 320 return wrap(store, scanner); 321 } 322 323 @Override 324 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 325 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 326 CompactionRequest request) throws IOException { 327 return wrap(store, scanner); 328 } 329 330 @Override 331 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 332 List<Cell> result) throws IOException { 333 TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName(); 334 Long ttl = this.ttls.get(tableName); 335 if (ttl != null) { 336 get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax()); 337 } 338 Integer version = this.versions.get(tableName); 339 if (version != null) { 340 get.readVersions(version); 341 } 342 } 343 344 @Override 345 public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, Scan scan) 346 throws IOException { 347 Region region = c.getEnvironment().getRegion(); 348 TableName tableName = region.getTableDescriptor().getTableName(); 349 Long ttl = this.ttls.get(tableName); 350 if (ttl != null) { 351 scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax()); 352 } 353 Integer version = this.versions.get(tableName); 354 if (version != null) { 355 scan.readVersions(version); 356 } 357 } 358 } 359}