001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.coprocessor.ObserverContext;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.RegionObserver;
038import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
039import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
040import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.hamcrest.Matchers;
044import org.junit.AfterClass;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Rule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.rules.TestName;
051
052@Category({ MediumTests.class, CoprocessorTests.class })
053public class TestCompactionWithShippingCoprocessor {
054
055  private static final AtomicInteger SHIPPED_COUNT = new AtomicInteger();
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestCompactionWithShippingCoprocessor.class);
060
061  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
062  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
063
064  @Rule
065  public TestName name = new TestName();
066
067  @BeforeClass
068  public static void setUpBeforeClass() throws Exception {
069    Configuration conf = TEST_UTIL.getConfiguration();
070    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
071    TEST_UTIL.startMiniCluster(1);
072  }
073
074  @AfterClass
075  public static void tearDownAfterClass() throws Exception {
076    TEST_UTIL.shutdownMiniCluster();
077  }
078
079  /**
080   * Verifies that if a coproc returns an InternalScanner which implements Shipper, the shippped
081   * method is appropriately called in Compactor.
082   */
083  @Test
084  public void testCoprocScannersExtendingShipperGetShipped() throws Exception {
085    int shippedCountBefore = SHIPPED_COUNT.get();
086    final TableName tableName = TableName.valueOf(name.getMethodName());
087    // Create a table with block size as 1024
088    final Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 1024,
089      CompactionObserver.class.getName());
090    TEST_UTIL.loadTable(table, FAMILY);
091    TEST_UTIL.flush();
092    try {
093      // get the block cache and region
094      RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
095      String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
096      HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
097      // trigger a major compaction
098      TEST_UTIL.compact(true);
099      assertThat(SHIPPED_COUNT.get(), Matchers.greaterThan(shippedCountBefore));
100    } finally {
101      table.close();
102    }
103  }
104
105  public static class CompactionObserver implements RegionCoprocessor, RegionObserver {
106
107    @Override
108    public Optional<RegionObserver> getRegionObserver() {
109      return Optional.of(this);
110    }
111
112    @Override
113    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
114      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
115      CompactionRequest request) throws IOException {
116      return new ShippedObservingScanner(scanner);
117    }
118  }
119
120  public static class ShippedObservingScanner implements InternalScanner, Shipper {
121
122    protected final InternalScanner scanner;
123
124    public ShippedObservingScanner(InternalScanner scanner) {
125      this.scanner = scanner;
126    }
127
128    @Override
129    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
130      return scanner.next(result, scannerContext);
131    }
132
133    @Override
134    public void close() throws IOException {
135      scanner.close();
136    }
137
138    @Override
139    public void shipped() throws IOException {
140      if (scanner instanceof Shipper) {
141        SHIPPED_COUNT.incrementAndGet();
142        ((Shipper) scanner).shipped();
143      }
144    }
145  }
146}