001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.atomic.AtomicBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparatorImpl; 033import org.apache.hadoop.hbase.ExtendedCell; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 039import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 040import org.apache.hadoop.hbase.coprocessor.ObserverContext; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 043import org.apache.hadoop.hbase.coprocessor.RegionObserver; 044import org.apache.hadoop.hbase.io.hfile.BlockCache; 045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 046import org.apache.hadoop.hbase.io.hfile.CacheConfig; 047import org.apache.hadoop.hbase.io.hfile.CachedBlock; 048import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 049import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 050import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; 051import org.apache.hadoop.hbase.regionserver.HRegion; 052import org.apache.hadoop.hbase.regionserver.HStore; 053import org.apache.hadoop.hbase.regionserver.InternalScanner; 054import org.apache.hadoop.hbase.regionserver.ScanType; 055import org.apache.hadoop.hbase.regionserver.ScannerContext; 056import org.apache.hadoop.hbase.regionserver.Store; 057import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 058import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 059import org.apache.hadoop.hbase.testclassification.ClientTests; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.junit.AfterClass; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069 070import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 071 072@Category({ LargeTests.class, ClientTests.class }) 073public class TestAvoidCellReferencesIntoShippedBlocks { 074 075 @ClassRule 076 public static final HBaseClassTestRule CLASS_RULE = 077 HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class); 078 079 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 080 static byte[][] ROWS = new byte[2][]; 081 private static byte[] ROW = Bytes.toBytes("testRow"); 082 private static byte[] ROW1 = Bytes.toBytes("testRow1"); 083 private static byte[] ROW2 = Bytes.toBytes("testRow2"); 084 private static byte[] ROW3 = Bytes.toBytes("testRow3"); 085 private static byte[] ROW4 = Bytes.toBytes("testRow4"); 086 private static byte[] ROW5 = Bytes.toBytes("testRow5"); 087 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 088 private static byte[][] FAMILIES_1 = new byte[1][0]; 089 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 090 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); 091 private static byte[] data = new byte[1000]; 092 protected static int SLAVES = 1; 093 private CountDownLatch latch = new CountDownLatch(1); 094 private static CountDownLatch compactReadLatch = new CountDownLatch(1); 095 private static AtomicBoolean doScan = new AtomicBoolean(false); 096 097 @Rule 098 public TestName name = new TestName(); 099 100 /** 101 * @throws java.lang.Exception 102 */ 103 @BeforeClass 104 public static void setUpBeforeClass() throws Exception { 105 ROWS[0] = ROW; 106 ROWS[1] = ROW1; 107 Configuration conf = TEST_UTIL.getConfiguration(); 108 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 109 MultiRowMutationEndpoint.class.getName()); 110 conf.setInt("hbase.regionserver.handler.count", 20); 111 conf.setInt("hbase.bucketcache.size", 400); 112 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 113 conf.setInt("hbase.hstore.compactionThreshold", 7); 114 conf.setFloat("hfile.block.cache.size", 0.2f); 115 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); 116 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 117 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000); 118 FAMILIES_1[0] = FAMILY; 119 TEST_UTIL.startMiniCluster(SLAVES); 120 compactReadLatch = new CountDownLatch(1); 121 } 122 123 /** 124 * @throws java.lang.Exception 125 */ 126 @AfterClass 127 public static void tearDownAfterClass() throws Exception { 128 TEST_UTIL.shutdownMiniCluster(); 129 } 130 131 @Test 132 public void testHBase16372InCompactionWritePath() throws Exception { 133 final TableName tableName = TableName.valueOf(name.getMethodName()); 134 // Create a table with block size as 1024 135 final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 136 CompactorRegionObserver.class.getName()); 137 try { 138 // get the block cache and region 139 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 140 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 141 HRegion region = 142 (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 143 HStore store = region.getStores().iterator().next(); 144 CacheConfig cacheConf = store.getCacheConfig(); 145 cacheConf.setCacheDataOnWrite(true); 146 cacheConf.setEvictOnClose(true); 147 final BlockCache cache = cacheConf.getBlockCache().get(); 148 // insert data. 5 Rows are added 149 Put put = new Put(ROW); 150 put.addColumn(FAMILY, QUALIFIER, data); 151 table.put(put); 152 put = new Put(ROW); 153 put.addColumn(FAMILY, QUALIFIER1, data); 154 table.put(put); 155 put = new Put(ROW1); 156 put.addColumn(FAMILY, QUALIFIER, data); 157 table.put(put); 158 // data was in memstore so don't expect any changes 159 region.flush(true); 160 put = new Put(ROW1); 161 put.addColumn(FAMILY, QUALIFIER1, data); 162 table.put(put); 163 put = new Put(ROW2); 164 put.addColumn(FAMILY, QUALIFIER, data); 165 table.put(put); 166 put = new Put(ROW2); 167 put.addColumn(FAMILY, QUALIFIER1, data); 168 table.put(put); 169 // data was in memstore so don't expect any changes 170 region.flush(true); 171 put = new Put(ROW3); 172 put.addColumn(FAMILY, QUALIFIER, data); 173 table.put(put); 174 put = new Put(ROW3); 175 put.addColumn(FAMILY, QUALIFIER1, data); 176 table.put(put); 177 put = new Put(ROW4); 178 put.addColumn(FAMILY, QUALIFIER, data); 179 table.put(put); 180 // data was in memstore so don't expect any changes 181 region.flush(true); 182 put = new Put(ROW4); 183 put.addColumn(FAMILY, QUALIFIER1, data); 184 table.put(put); 185 put = new Put(ROW5); 186 put.addColumn(FAMILY, QUALIFIER, data); 187 table.put(put); 188 put = new Put(ROW5); 189 put.addColumn(FAMILY, QUALIFIER1, data); 190 table.put(put); 191 // data was in memstore so don't expect any changes 192 region.flush(true); 193 // Load cache 194 Scan s = new Scan(); 195 s.setMaxResultSize(1000); 196 int count; 197 try (ResultScanner scanner = table.getScanner(s)) { 198 count = Iterables.size(scanner); 199 } 200 assertEquals("Count all the rows ", 6, count); 201 // all the cache is loaded 202 // trigger a major compaction 203 ScannerThread scannerThread = new ScannerThread(table, cache); 204 scannerThread.start(); 205 region.compact(true); 206 s = new Scan(); 207 s.setMaxResultSize(1000); 208 try (ResultScanner scanner = table.getScanner(s)) { 209 count = Iterables.size(scanner); 210 } 211 assertEquals("Count all the rows ", 6, count); 212 } finally { 213 table.close(); 214 } 215 } 216 217 private static class ScannerThread extends Thread { 218 private final Table table; 219 private final BlockCache cache; 220 221 public ScannerThread(Table table, BlockCache cache) { 222 this.table = table; 223 this.cache = cache; 224 } 225 226 @Override 227 public void run() { 228 Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1); 229 try { 230 while (!doScan.get()) { 231 try { 232 // Sleep till you start scan 233 Thread.sleep(1); 234 } catch (InterruptedException e) { 235 } 236 } 237 List<BlockCacheKey> cacheList = new ArrayList<>(); 238 Iterator<CachedBlock> iterator = cache.iterator(); 239 // evict all the blocks 240 while (iterator.hasNext()) { 241 CachedBlock next = iterator.next(); 242 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 243 cacheList.add(cacheKey); 244 // evict what ever is available 245 cache.evictBlock(cacheKey); 246 } 247 try (ResultScanner scanner = table.getScanner(s)) { 248 while (scanner.next() != null) { 249 } 250 } 251 compactReadLatch.countDown(); 252 } catch (IOException e) { 253 } 254 } 255 } 256 257 public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver { 258 259 @Override 260 public Optional<RegionObserver> getRegionObserver() { 261 return Optional.of(this); 262 } 263 264 @Override 265 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 266 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 267 CompactionRequest request) throws IOException { 268 return new CompactorInternalScanner(scanner); 269 } 270 } 271 272 private static final class CompactorInternalScanner extends DelegatingInternalScanner { 273 274 public CompactorInternalScanner(InternalScanner scanner) { 275 super(scanner); 276 } 277 278 @Override 279 public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext) 280 throws IOException { 281 boolean next = scanner.next(result, scannerContext); 282 for (Iterator<? super ExtendedCell> iter = result.iterator(); iter.hasNext();) { 283 Cell cell = (Cell) iter.next(); 284 if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { 285 try { 286 // hold the compaction 287 // set doscan to true 288 doScan.compareAndSet(false, true); 289 compactReadLatch.await(); 290 } catch (InterruptedException e) { 291 } 292 } 293 } 294 return next; 295 } 296 } 297 298 @Test 299 public void testHBASE16372InReadPath() throws Exception { 300 final TableName tableName = TableName.valueOf(name.getMethodName()); 301 // Create a table with block size as 1024 302 try (Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null)) { 303 // get the block cache and region 304 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 305 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 306 HRegion region = 307 (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 308 HStore store = region.getStores().iterator().next(); 309 CacheConfig cacheConf = store.getCacheConfig(); 310 cacheConf.setCacheDataOnWrite(true); 311 cacheConf.setEvictOnClose(true); 312 final BlockCache cache = cacheConf.getBlockCache().get(); 313 // insert data. 5 Rows are added 314 Put put = new Put(ROW); 315 put.addColumn(FAMILY, QUALIFIER, data); 316 table.put(put); 317 put = new Put(ROW); 318 put.addColumn(FAMILY, QUALIFIER1, data); 319 table.put(put); 320 put = new Put(ROW1); 321 put.addColumn(FAMILY, QUALIFIER, data); 322 table.put(put); 323 put = new Put(ROW1); 324 put.addColumn(FAMILY, QUALIFIER1, data); 325 table.put(put); 326 put = new Put(ROW2); 327 put.addColumn(FAMILY, QUALIFIER, data); 328 table.put(put); 329 put = new Put(ROW2); 330 put.addColumn(FAMILY, QUALIFIER1, data); 331 table.put(put); 332 put = new Put(ROW3); 333 put.addColumn(FAMILY, QUALIFIER, data); 334 table.put(put); 335 put = new Put(ROW3); 336 put.addColumn(FAMILY, QUALIFIER1, data); 337 table.put(put); 338 put = new Put(ROW4); 339 put.addColumn(FAMILY, QUALIFIER, data); 340 table.put(put); 341 put = new Put(ROW4); 342 put.addColumn(FAMILY, QUALIFIER1, data); 343 table.put(put); 344 put = new Put(ROW5); 345 put.addColumn(FAMILY, QUALIFIER, data); 346 table.put(put); 347 put = new Put(ROW5); 348 put.addColumn(FAMILY, QUALIFIER1, data); 349 table.put(put); 350 // data was in memstore so don't expect any changes 351 region.flush(true); 352 // Load cache 353 Scan s = new Scan(); 354 s.setMaxResultSize(1000); 355 int count; 356 try (ResultScanner scanner = table.getScanner(s)) { 357 count = Iterables.size(scanner); 358 } 359 assertEquals("Count all the rows ", 6, count); 360 361 // Scan from cache 362 s = new Scan(); 363 // Start a scan from row3 364 s.setCaching(1); 365 s.withStartRow(ROW1); 366 // set partial as true so that the scan can send partial columns also 367 s.setAllowPartialResults(true); 368 s.setMaxResultSize(1000); 369 try (ScanPerNextResultScanner scanner = 370 new ScanPerNextResultScanner(TEST_UTIL.getAsyncConnection().getTable(tableName), s)) { 371 Thread evictorThread = new Thread() { 372 @Override 373 public void run() { 374 List<BlockCacheKey> cacheList = new ArrayList<>(); 375 Iterator<CachedBlock> iterator = cache.iterator(); 376 // evict all the blocks 377 while (iterator.hasNext()) { 378 CachedBlock next = iterator.next(); 379 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 380 cacheList.add(cacheKey); 381 /** 382 * There is only one Block referenced by rpc,here we evict blocks which have no rpc 383 * referenced. 384 */ 385 evictBlock(cache, cacheKey); 386 } 387 try { 388 Thread.sleep(1); 389 } catch (InterruptedException e1) { 390 } 391 iterator = cache.iterator(); 392 int refBlockCount = 0; 393 while (iterator.hasNext()) { 394 iterator.next(); 395 refBlockCount++; 396 } 397 assertEquals("One block should be there ", 1, refBlockCount); 398 // Rescan to prepopulate the data 399 // cache this row. 400 Scan s1 = new Scan(); 401 // This scan will start from ROW1 and it will populate the cache with a 402 // row that is lower than ROW3. 403 s1.withStartRow(ROW3); 404 s1.withStopRow(ROW5); 405 s1.setCaching(1); 406 407 try (ResultScanner scanner = table.getScanner(s1)) { 408 int count = Iterables.size(scanner); 409 assertEquals("Count the rows", 2, count); 410 int newBlockRefCount = 0; 411 List<BlockCacheKey> newCacheList = new ArrayList<>(); 412 while (true) { 413 newBlockRefCount = 0; 414 newCacheList.clear(); 415 iterator = cache.iterator(); 416 while (iterator.hasNext()) { 417 CachedBlock next = iterator.next(); 418 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); 419 newCacheList.add(cacheKey); 420 } 421 for (BlockCacheKey key : cacheList) { 422 if (newCacheList.contains(key)) { 423 newBlockRefCount++; 424 } 425 } 426 if (newBlockRefCount == 6) { 427 break; 428 } 429 } 430 latch.countDown(); 431 } catch (IOException e) { 432 } 433 } 434 }; 435 count = 0; 436 while (scanner.next() != null) { 437 count++; 438 if (count == 2) { 439 evictorThread.start(); 440 latch.await(); 441 } 442 } 443 } 444 assertEquals("Count should give all rows ", 10, count); 445 } 446 } 447 448 /** 449 * For {@link BucketCache},we only evict Block if there is no rpc referenced. 450 */ 451 private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) { 452 assertTrue(blockCache instanceof CombinedBlockCache); 453 BlockCache[] blockCaches = blockCache.getBlockCaches(); 454 for (BlockCache currentBlockCache : blockCaches) { 455 if (currentBlockCache instanceof BucketCache) { 456 ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey); 457 } else { 458 currentBlockCache.evictBlock(blockCacheKey); 459 } 460 } 461 462 } 463}