001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertSame; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.Cell.Type; 032import org.apache.hadoop.hbase.CellBuilderFactory; 033import org.apache.hadoop.hbase.CellBuilderType; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.coprocessor.ObserverContext; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 045import org.apache.hadoop.hbase.coprocessor.RegionObserver; 046import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 047import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 048import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; 049import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 050import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 051import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 052import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.Pair; 056import org.junit.After; 057import org.junit.AfterClass; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Ignore; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064 065/** 066 * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own 067 * code. 068 */ 069@Category({ CoprocessorTests.class, MediumTests.class }) 070public class TestCompactionLifeCycleTracker { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestCompactionLifeCycleTracker.class); 075 076 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 077 078 private static final TableName NAME = 079 TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName()); 080 081 private static final byte[] CF1 = Bytes.toBytes("CF1"); 082 083 private static final byte[] CF2 = Bytes.toBytes("CF2"); 084 085 private static final byte[] QUALIFIER = Bytes.toBytes("CQ"); 086 087 private HRegion region; 088 089 private static CompactionLifeCycleTracker TRACKER = null; 090 091 // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks. 092 public static final class CompactionObserver implements RegionObserver, RegionCoprocessor { 093 094 @Override 095 public Optional<RegionObserver> getRegionObserver() { 096 return Optional.of(this); 097 } 098 099 @Override 100 public void preCompactSelection(ObserverContext<? extends RegionCoprocessorEnvironment> c, 101 Store store, List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) 102 throws IOException { 103 if (TRACKER != null) { 104 assertSame(tracker, TRACKER); 105 } 106 } 107 108 @Override 109 public void postCompactSelection(ObserverContext<? extends RegionCoprocessorEnvironment> c, 110 Store store, List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker, 111 CompactionRequest request) { 112 if (TRACKER != null) { 113 assertSame(tracker, TRACKER); 114 } 115 } 116 117 @Override 118 public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, 119 Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, 120 CompactionRequest request) throws IOException { 121 if (TRACKER != null) { 122 assertSame(tracker, TRACKER); 123 } 124 return scanner; 125 } 126 127 @Override 128 public void postCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store, 129 StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) 130 throws IOException { 131 if (TRACKER != null) { 132 assertSame(tracker, TRACKER); 133 } 134 } 135 } 136 137 @BeforeClass 138 public static void setUpBeforeClass() throws Exception { 139 UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); 140 UTIL.startMiniCluster(3); 141 } 142 143 @AfterClass 144 public static void tearDownAfterClass() throws Exception { 145 UTIL.shutdownMiniCluster(); 146 } 147 148 @Before 149 public void setUp() throws IOException { 150 UTIL.getAdmin() 151 .createTable(TableDescriptorBuilder.newBuilder(NAME) 152 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)) 153 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)) 154 .setCoprocessor(CompactionObserver.class.getName()).build()); 155 try (Table table = UTIL.getConnection().getTable(NAME)) { 156 for (int i = 0; i < 100; i++) { 157 byte[] row = Bytes.toBytes(i); 158 table 159 .put(new Put(row).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 160 .setFamily(CF1).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP) 161 .setType(Cell.Type.Put).setValue(Bytes.toBytes(i)).build())); 162 } 163 UTIL.getAdmin().flush(NAME); 164 for (int i = 100; i < 200; i++) { 165 byte[] row = Bytes.toBytes(i); 166 table 167 .put(new Put(row).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 168 .setFamily(CF1).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP) 169 .setType(Type.Put).setValue(Bytes.toBytes(i)).build())); 170 } 171 UTIL.getAdmin().flush(NAME); 172 } 173 region = UTIL.getHBaseCluster().getRegions(NAME).get(0); 174 assertEquals(2, region.getStore(CF1).getStorefilesCount()); 175 assertEquals(0, region.getStore(CF2).getStorefilesCount()); 176 } 177 178 @After 179 public void tearDown() throws IOException { 180 region = null; 181 TRACKER = null; 182 UTIL.deleteTable(NAME); 183 } 184 185 private static final class Tracker implements CompactionLifeCycleTracker { 186 187 final List<Pair<Store, String>> notExecutedStores = new ArrayList<>(); 188 189 final List<Store> beforeExecuteStores = new ArrayList<>(); 190 191 final List<Store> afterExecuteStores = new ArrayList<>(); 192 193 private boolean completed = false; 194 195 @Override 196 public void notExecuted(Store store, String reason) { 197 notExecutedStores.add(Pair.newPair(store, reason)); 198 } 199 200 @Override 201 public void beforeExecution(Store store) { 202 beforeExecuteStores.add(store); 203 } 204 205 @Override 206 public void afterExecution(Store store) { 207 afterExecuteStores.add(store); 208 } 209 210 @Override 211 public synchronized void completed() { 212 completed = true; 213 notifyAll(); 214 } 215 216 public synchronized void await() throws InterruptedException { 217 while (!completed) { 218 wait(); 219 } 220 } 221 } 222 223 @Test 224 public void testRequestOnRegion() throws IOException, InterruptedException { 225 Tracker tracker = new Tracker(); 226 TRACKER = tracker; 227 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 228 tracker.await(); 229 assertEquals(1, tracker.notExecutedStores.size()); 230 assertEquals(Bytes.toString(CF2), 231 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 232 assertThat(tracker.notExecutedStores.get(0).getSecond(), 233 containsString("compaction request was cancelled")); 234 235 assertEquals(1, tracker.beforeExecuteStores.size()); 236 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 237 238 assertEquals(1, tracker.afterExecuteStores.size()); 239 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 240 } 241 242 @Test 243 public void testRequestOnStore() throws IOException, InterruptedException { 244 Tracker tracker = new Tracker(); 245 TRACKER = tracker; 246 region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker); 247 tracker.await(); 248 assertTrue(tracker.notExecutedStores.isEmpty()); 249 assertEquals(1, tracker.beforeExecuteStores.size()); 250 assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); 251 assertEquals(1, tracker.afterExecuteStores.size()); 252 assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); 253 254 tracker = new Tracker(); 255 TRACKER = tracker; 256 region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker); 257 tracker.await(); 258 assertEquals(1, tracker.notExecutedStores.size()); 259 assertEquals(Bytes.toString(CF2), 260 tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); 261 assertThat(tracker.notExecutedStores.get(0).getSecond(), 262 containsString("compaction request was cancelled")); 263 assertTrue(tracker.beforeExecuteStores.isEmpty()); 264 assertTrue(tracker.afterExecuteStores.isEmpty()); 265 } 266 267 // This test assumes that compaction wouldn't happen with null user. 268 // But null user means system generated compaction so compaction should happen 269 // even if the space quota is violated. So this test should be removed/ignored. 270 @Ignore 271 @Test 272 public void testSpaceQuotaViolation() throws IOException, InterruptedException { 273 region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME, 274 new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L, 275 100L)); 276 Tracker tracker = new Tracker(); 277 TRACKER = tracker; 278 region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); 279 tracker.await(); 280 assertEquals(2, tracker.notExecutedStores.size()); 281 tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName() 282 .compareTo(p2.getFirst().getColumnFamilyName())); 283 284 assertEquals(Bytes.toString(CF2), 285 tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName()); 286 assertThat(tracker.notExecutedStores.get(1).getSecond(), 287 containsString("space quota violation")); 288 289 assertTrue(tracker.beforeExecuteStores.isEmpty()); 290 assertTrue(tracker.afterExecuteStores.isEmpty()); 291 } 292}