001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparatorImpl;
033import org.apache.hadoop.hbase.ExtendedCell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
039import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
043import org.apache.hadoop.hbase.coprocessor.RegionObserver;
044import org.apache.hadoop.hbase.io.hfile.BlockCache;
045import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
046import org.apache.hadoop.hbase.io.hfile.CacheConfig;
047import org.apache.hadoop.hbase.io.hfile.CachedBlock;
048import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
049import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
050import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
051import org.apache.hadoop.hbase.regionserver.HRegion;
052import org.apache.hadoop.hbase.regionserver.HStore;
053import org.apache.hadoop.hbase.regionserver.InternalScanner;
054import org.apache.hadoop.hbase.regionserver.ScanType;
055import org.apache.hadoop.hbase.regionserver.ScannerContext;
056import org.apache.hadoop.hbase.regionserver.Store;
057import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
058import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
059import org.apache.hadoop.hbase.testclassification.ClientTests;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.junit.AfterClass;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TestName;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
071
072@Category({ LargeTests.class, ClientTests.class })
073public class TestAvoidCellReferencesIntoShippedBlocks {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestAvoidCellReferencesIntoShippedBlocks.class);
078
079  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
080  static byte[][] ROWS = new byte[2][];
081  private static byte[] ROW = Bytes.toBytes("testRow");
082  private static byte[] ROW1 = Bytes.toBytes("testRow1");
083  private static byte[] ROW2 = Bytes.toBytes("testRow2");
084  private static byte[] ROW3 = Bytes.toBytes("testRow3");
085  private static byte[] ROW4 = Bytes.toBytes("testRow4");
086  private static byte[] ROW5 = Bytes.toBytes("testRow5");
087  private static byte[] FAMILY = Bytes.toBytes("testFamily");
088  private static byte[][] FAMILIES_1 = new byte[1][0];
089  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
090  private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
091  private static byte[] data = new byte[1000];
092  protected static int SLAVES = 1;
093  private CountDownLatch latch = new CountDownLatch(1);
094  private static CountDownLatch compactReadLatch = new CountDownLatch(1);
095  private static AtomicBoolean doScan = new AtomicBoolean(false);
096
097  @Rule
098  public TestName name = new TestName();
099
100  /**
101   * @throws java.lang.Exception
102   */
103  @BeforeClass
104  public static void setUpBeforeClass() throws Exception {
105    ROWS[0] = ROW;
106    ROWS[1] = ROW1;
107    Configuration conf = TEST_UTIL.getConfiguration();
108    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
109      MultiRowMutationEndpoint.class.getName());
110    conf.setInt("hbase.regionserver.handler.count", 20);
111    conf.setInt("hbase.bucketcache.size", 400);
112    conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
113    conf.setInt("hbase.hstore.compactionThreshold", 7);
114    conf.setFloat("hfile.block.cache.size", 0.2f);
115    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
116    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
117    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000);
118    FAMILIES_1[0] = FAMILY;
119    TEST_UTIL.startMiniCluster(SLAVES);
120    compactReadLatch = new CountDownLatch(1);
121  }
122
123  /**
124   * @throws java.lang.Exception
125   */
126  @AfterClass
127  public static void tearDownAfterClass() throws Exception {
128    TEST_UTIL.shutdownMiniCluster();
129  }
130
131  @Test
132  public void testHBase16372InCompactionWritePath() throws Exception {
133    final TableName tableName = TableName.valueOf(name.getMethodName());
134    // Create a table with block size as 1024
135    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
136      CompactorRegionObserver.class.getName());
137    try {
138      // get the block cache and region
139      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
140      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
141      HRegion region =
142        (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
143      HStore store = region.getStores().iterator().next();
144      CacheConfig cacheConf = store.getCacheConfig();
145      cacheConf.setCacheDataOnWrite(true);
146      cacheConf.setEvictOnClose(true);
147      final BlockCache cache = cacheConf.getBlockCache().get();
148      // insert data. 5 Rows are added
149      Put put = new Put(ROW);
150      put.addColumn(FAMILY, QUALIFIER, data);
151      table.put(put);
152      put = new Put(ROW);
153      put.addColumn(FAMILY, QUALIFIER1, data);
154      table.put(put);
155      put = new Put(ROW1);
156      put.addColumn(FAMILY, QUALIFIER, data);
157      table.put(put);
158      // data was in memstore so don't expect any changes
159      region.flush(true);
160      put = new Put(ROW1);
161      put.addColumn(FAMILY, QUALIFIER1, data);
162      table.put(put);
163      put = new Put(ROW2);
164      put.addColumn(FAMILY, QUALIFIER, data);
165      table.put(put);
166      put = new Put(ROW2);
167      put.addColumn(FAMILY, QUALIFIER1, data);
168      table.put(put);
169      // data was in memstore so don't expect any changes
170      region.flush(true);
171      put = new Put(ROW3);
172      put.addColumn(FAMILY, QUALIFIER, data);
173      table.put(put);
174      put = new Put(ROW3);
175      put.addColumn(FAMILY, QUALIFIER1, data);
176      table.put(put);
177      put = new Put(ROW4);
178      put.addColumn(FAMILY, QUALIFIER, data);
179      table.put(put);
180      // data was in memstore so don't expect any changes
181      region.flush(true);
182      put = new Put(ROW4);
183      put.addColumn(FAMILY, QUALIFIER1, data);
184      table.put(put);
185      put = new Put(ROW5);
186      put.addColumn(FAMILY, QUALIFIER, data);
187      table.put(put);
188      put = new Put(ROW5);
189      put.addColumn(FAMILY, QUALIFIER1, data);
190      table.put(put);
191      // data was in memstore so don't expect any changes
192      region.flush(true);
193      // Load cache
194      Scan s = new Scan();
195      s.setMaxResultSize(1000);
196      int count;
197      try (ResultScanner scanner = table.getScanner(s)) {
198        count = Iterables.size(scanner);
199      }
200      assertEquals("Count all the rows ", 6, count);
201      // all the cache is loaded
202      // trigger a major compaction
203      ScannerThread scannerThread = new ScannerThread(table, cache);
204      scannerThread.start();
205      region.compact(true);
206      s = new Scan();
207      s.setMaxResultSize(1000);
208      try (ResultScanner scanner = table.getScanner(s)) {
209        count = Iterables.size(scanner);
210      }
211      assertEquals("Count all the rows ", 6, count);
212    } finally {
213      table.close();
214    }
215  }
216
217  private static class ScannerThread extends Thread {
218    private final Table table;
219    private final BlockCache cache;
220
221    public ScannerThread(Table table, BlockCache cache) {
222      this.table = table;
223      this.cache = cache;
224    }
225
226    @Override
227    public void run() {
228      Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1);
229      try {
230        while (!doScan.get()) {
231          try {
232            // Sleep till you start scan
233            Thread.sleep(1);
234          } catch (InterruptedException e) {
235          }
236        }
237        List<BlockCacheKey> cacheList = new ArrayList<>();
238        Iterator<CachedBlock> iterator = cache.iterator();
239        // evict all the blocks
240        while (iterator.hasNext()) {
241          CachedBlock next = iterator.next();
242          BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
243          cacheList.add(cacheKey);
244          // evict what ever is available
245          cache.evictBlock(cacheKey);
246        }
247        try (ResultScanner scanner = table.getScanner(s)) {
248          while (scanner.next() != null) {
249          }
250        }
251        compactReadLatch.countDown();
252      } catch (IOException e) {
253      }
254    }
255  }
256
257  public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver {
258
259    @Override
260    public Optional<RegionObserver> getRegionObserver() {
261      return Optional.of(this);
262    }
263
264    @Override
265    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
266      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
267      CompactionRequest request) throws IOException {
268      return new CompactorInternalScanner(scanner);
269    }
270  }
271
272  private static final class CompactorInternalScanner extends DelegatingInternalScanner {
273
274    public CompactorInternalScanner(InternalScanner scanner) {
275      super(scanner);
276    }
277
278    @Override
279    public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
280      throws IOException {
281      boolean next = scanner.next(result, scannerContext);
282      for (Iterator<? super ExtendedCell> iter = result.iterator(); iter.hasNext();) {
283        Cell cell = (Cell) iter.next();
284        if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) {
285          try {
286            // hold the compaction
287            // set doscan to true
288            doScan.compareAndSet(false, true);
289            compactReadLatch.await();
290          } catch (InterruptedException e) {
291          }
292        }
293      }
294      return next;
295    }
296  }
297
298  @Test
299  public void testHBASE16372InReadPath() throws Exception {
300    final TableName tableName = TableName.valueOf(name.getMethodName());
301    // Create a table with block size as 1024
302    try (Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null)) {
303      // get the block cache and region
304      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
305      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
306      HRegion region =
307        (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
308      HStore store = region.getStores().iterator().next();
309      CacheConfig cacheConf = store.getCacheConfig();
310      cacheConf.setCacheDataOnWrite(true);
311      cacheConf.setEvictOnClose(true);
312      final BlockCache cache = cacheConf.getBlockCache().get();
313      // insert data. 5 Rows are added
314      Put put = new Put(ROW);
315      put.addColumn(FAMILY, QUALIFIER, data);
316      table.put(put);
317      put = new Put(ROW);
318      put.addColumn(FAMILY, QUALIFIER1, data);
319      table.put(put);
320      put = new Put(ROW1);
321      put.addColumn(FAMILY, QUALIFIER, data);
322      table.put(put);
323      put = new Put(ROW1);
324      put.addColumn(FAMILY, QUALIFIER1, data);
325      table.put(put);
326      put = new Put(ROW2);
327      put.addColumn(FAMILY, QUALIFIER, data);
328      table.put(put);
329      put = new Put(ROW2);
330      put.addColumn(FAMILY, QUALIFIER1, data);
331      table.put(put);
332      put = new Put(ROW3);
333      put.addColumn(FAMILY, QUALIFIER, data);
334      table.put(put);
335      put = new Put(ROW3);
336      put.addColumn(FAMILY, QUALIFIER1, data);
337      table.put(put);
338      put = new Put(ROW4);
339      put.addColumn(FAMILY, QUALIFIER, data);
340      table.put(put);
341      put = new Put(ROW4);
342      put.addColumn(FAMILY, QUALIFIER1, data);
343      table.put(put);
344      put = new Put(ROW5);
345      put.addColumn(FAMILY, QUALIFIER, data);
346      table.put(put);
347      put = new Put(ROW5);
348      put.addColumn(FAMILY, QUALIFIER1, data);
349      table.put(put);
350      // data was in memstore so don't expect any changes
351      region.flush(true);
352      // Load cache
353      Scan s = new Scan();
354      s.setMaxResultSize(1000);
355      int count;
356      try (ResultScanner scanner = table.getScanner(s)) {
357        count = Iterables.size(scanner);
358      }
359      assertEquals("Count all the rows ", 6, count);
360
361      // Scan from cache
362      s = new Scan();
363      // Start a scan from row3
364      s.setCaching(1);
365      s.withStartRow(ROW1);
366      // set partial as true so that the scan can send partial columns also
367      s.setAllowPartialResults(true);
368      s.setMaxResultSize(1000);
369      try (ScanPerNextResultScanner scanner =
370        new ScanPerNextResultScanner(TEST_UTIL.getAsyncConnection().getTable(tableName), s)) {
371        Thread evictorThread = new Thread() {
372          @Override
373          public void run() {
374            List<BlockCacheKey> cacheList = new ArrayList<>();
375            Iterator<CachedBlock> iterator = cache.iterator();
376            // evict all the blocks
377            while (iterator.hasNext()) {
378              CachedBlock next = iterator.next();
379              BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
380              cacheList.add(cacheKey);
381              /**
382               * There is only one Block referenced by rpc,here we evict blocks which have no rpc
383               * referenced.
384               */
385              evictBlock(cache, cacheKey);
386            }
387            try {
388              Thread.sleep(1);
389            } catch (InterruptedException e1) {
390            }
391            iterator = cache.iterator();
392            int refBlockCount = 0;
393            while (iterator.hasNext()) {
394              iterator.next();
395              refBlockCount++;
396            }
397            assertEquals("One block should be there ", 1, refBlockCount);
398            // Rescan to prepopulate the data
399            // cache this row.
400            Scan s1 = new Scan();
401            // This scan will start from ROW1 and it will populate the cache with a
402            // row that is lower than ROW3.
403            s1.withStartRow(ROW3);
404            s1.withStopRow(ROW5);
405            s1.setCaching(1);
406
407            try (ResultScanner scanner = table.getScanner(s1)) {
408              int count = Iterables.size(scanner);
409              assertEquals("Count the rows", 2, count);
410              int newBlockRefCount = 0;
411              List<BlockCacheKey> newCacheList = new ArrayList<>();
412              while (true) {
413                newBlockRefCount = 0;
414                newCacheList.clear();
415                iterator = cache.iterator();
416                while (iterator.hasNext()) {
417                  CachedBlock next = iterator.next();
418                  BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
419                  newCacheList.add(cacheKey);
420                }
421                for (BlockCacheKey key : cacheList) {
422                  if (newCacheList.contains(key)) {
423                    newBlockRefCount++;
424                  }
425                }
426                if (newBlockRefCount == 6) {
427                  break;
428                }
429              }
430              latch.countDown();
431            } catch (IOException e) {
432            }
433          }
434        };
435        count = 0;
436        while (scanner.next() != null) {
437          count++;
438          if (count == 2) {
439            evictorThread.start();
440            latch.await();
441          }
442        }
443      }
444      assertEquals("Count should give all rows ", 10, count);
445    }
446  }
447
448  /**
449   * For {@link BucketCache},we only evict Block if there is no rpc referenced.
450   */
451  private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
452    assertTrue(blockCache instanceof CombinedBlockCache);
453    BlockCache[] blockCaches = blockCache.getBlockCaches();
454    for (BlockCache currentBlockCache : blockCaches) {
455      if (currentBlockCache instanceof BucketCache) {
456        ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
457      } else {
458        currentBlockCache.evictBlock(blockCacheKey);
459      }
460    }
461
462  }
463}