001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.RegionLocator; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.coprocessor.ObserverContext; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 037import org.apache.hadoop.hbase.coprocessor.RegionObserver; 038import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 039import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 040import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.hamcrest.Matchers; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Rule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.rules.TestName; 051 052@Category({ MediumTests.class, CoprocessorTests.class }) 053public class TestCompactionWithShippingCoprocessor { 054 055 private static final AtomicInteger SHIPPED_COUNT = new AtomicInteger(); 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestCompactionWithShippingCoprocessor.class); 060 061 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 062 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 063 064 @Rule 065 public TestName name = new TestName(); 066 067 @BeforeClass 068 public static void setUpBeforeClass() throws Exception { 069 Configuration conf = TEST_UTIL.getConfiguration(); 070 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry 071 TEST_UTIL.startMiniCluster(1); 072 } 073 074 @AfterClass 075 public static void tearDownAfterClass() throws Exception { 076 TEST_UTIL.shutdownMiniCluster(); 077 } 078 079 /** 080 * Verifies that if a coproc returns an InternalScanner which implements Shipper, the shippped 081 * method is appropriately called in Compactor. 082 */ 083 @Test 084 public void testCoprocScannersExtendingShipperGetShipped() throws Exception { 085 int shippedCountBefore = SHIPPED_COUNT.get(); 086 final TableName tableName = TableName.valueOf(name.getMethodName()); 087 // Create a table with block size as 1024 088 final Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 1024, 089 CompactionObserver.class.getName()); 090 TEST_UTIL.loadTable(table, FAMILY); 091 TEST_UTIL.flush(); 092 try { 093 // get the block cache and region 094 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); 095 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 096 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); 097 // trigger a major compaction 098 TEST_UTIL.compact(true); 099 assertThat(SHIPPED_COUNT.get(), Matchers.greaterThan(shippedCountBefore)); 100 } finally { 101 table.close(); 102 } 103 } 104 105 public static class CompactionObserver implements RegionCoprocessor, RegionObserver { 106 107 @Override 108 public Optional<RegionObserver> getRegionObserver() { 109 return Optional.of(this); 110 } 111 112 @Override 113 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 114 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 115 CompactionRequest request) throws IOException { 116 return new ShippedObservingScanner(scanner); 117 } 118 } 119 120 public static class ShippedObservingScanner implements InternalScanner, Shipper { 121 122 protected final InternalScanner scanner; 123 124 public ShippedObservingScanner(InternalScanner scanner) { 125 this.scanner = scanner; 126 } 127 128 @Override 129 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { 130 return scanner.next(result, scannerContext); 131 } 132 133 @Override 134 public void close() throws IOException { 135 scanner.close(); 136 } 137 138 @Override 139 public void shipped() throws IOException { 140 if (scanner instanceof Shipper) { 141 SHIPPED_COUNT.incrementAndGet(); 142 ((Shipper) scanner).shipped(); 143 } 144 } 145 } 146}