001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; 025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; 026import static org.hamcrest.MatcherAssert.assertThat; 027import static org.hamcrest.Matchers.allOf; 028import static org.hamcrest.Matchers.containsString; 029import static org.hamcrest.Matchers.hasProperty; 030import static org.hamcrest.Matchers.instanceOf; 031import static org.hamcrest.Matchers.notNullValue; 032import static org.junit.Assert.assertEquals; 033import static org.junit.Assert.assertFalse; 034import static org.junit.Assert.assertThrows; 035import static org.junit.Assert.assertTrue; 036import static org.junit.Assert.fail; 037import static org.mockito.ArgumentMatchers.any; 038import static org.mockito.Mockito.doAnswer; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.spy; 041import static org.mockito.Mockito.when; 042 043import java.io.IOException; 044import java.io.InputStream; 045import java.io.OutputStream; 046import java.util.ArrayList; 047import java.util.Collection; 048import java.util.Collections; 049import java.util.List; 050import java.util.Objects; 051import java.util.Optional; 052import java.util.concurrent.CountDownLatch; 053import java.util.concurrent.TimeUnit; 054import java.util.zip.GZIPInputStream; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FSDataOutputStream; 057import org.apache.hadoop.fs.FileStatus; 058import org.apache.hadoop.fs.FileSystem; 059import org.apache.hadoop.fs.Path; 060import org.apache.hadoop.hbase.ChoreService; 061import org.apache.hadoop.hbase.HBaseClassTestRule; 062import org.apache.hadoop.hbase.HBaseConfiguration; 063import org.apache.hadoop.hbase.HBaseTestingUtil; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.HTestConst; 066import org.apache.hadoop.hbase.KeyValue; 067import org.apache.hadoop.hbase.Waiter; 068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 070import org.apache.hadoop.hbase.client.Delete; 071import org.apache.hadoop.hbase.client.Durability; 072import org.apache.hadoop.hbase.client.Put; 073import org.apache.hadoop.hbase.client.Table; 074import org.apache.hadoop.hbase.client.TableDescriptor; 075import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 076import org.apache.hadoop.hbase.io.compress.Compression; 077import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 078import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 079import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 080import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 081import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 082import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 083import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 084import org.apache.hadoop.hbase.security.User; 085import org.apache.hadoop.hbase.testclassification.MediumTests; 086import org.apache.hadoop.hbase.testclassification.RegionServerTests; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 089import org.apache.hadoop.hbase.util.Threads; 090import org.apache.hadoop.hbase.wal.WAL; 091import org.apache.hadoop.io.IOUtils; 092import org.junit.After; 093import org.junit.Assume; 094import org.junit.Before; 095import org.junit.ClassRule; 096import org.junit.Rule; 097import org.junit.Test; 098import org.junit.experimental.categories.Category; 099import org.junit.rules.TestName; 100import org.mockito.Mockito; 101import org.mockito.invocation.InvocationOnMock; 102import org.mockito.stubbing.Answer; 103import org.slf4j.LoggerFactory; 104 105/** 106 * Test compaction framework and common functions 107 */ 108@Category({ RegionServerTests.class, MediumTests.class }) 109public class TestCompaction { 110 111 @ClassRule 112 public static final HBaseClassTestRule CLASS_RULE = 113 HBaseClassTestRule.forClass(TestCompaction.class); 114 115 @Rule 116 public TestName name = new TestName(); 117 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 118 protected Configuration conf = UTIL.getConfiguration(); 119 120 private HRegion r = null; 121 private TableDescriptor tableDescriptor = null; 122 private static final byte[] COLUMN_FAMILY = fam1; 123 private final byte[] STARTROW = Bytes.toBytes(START_KEY); 124 private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 125 private int compactionThreshold; 126 private byte[] secondRowBytes, thirdRowBytes; 127 private static final long MAX_FILES_TO_COMPACT = 10; 128 private final byte[] FAMILY = Bytes.toBytes("cf"); 129 130 /** constructor */ 131 public TestCompaction() { 132 // Set cache flush size to 1MB 133 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 134 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 135 conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L); 136 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 137 NoLimitThroughputController.class.getName()); 138 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 139 140 secondRowBytes = START_KEY_BYTES.clone(); 141 // Increment the least significant character so we get to next row. 142 secondRowBytes[START_KEY_BYTES.length - 1]++; 143 thirdRowBytes = START_KEY_BYTES.clone(); 144 thirdRowBytes[START_KEY_BYTES.length - 1] = 145 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 146 } 147 148 @Before 149 public void setUp() throws Exception { 150 TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name.getMethodName()); 151 if (name.getMethodName().equals("testCompactionSeqId")) { 152 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 153 UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 154 DummyCompactor.class.getName()); 155 ColumnFamilyDescriptor familyDescriptor = 156 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build(); 157 builder.setColumnFamily(familyDescriptor); 158 } 159 if (name.getMethodName().equals("testCompactionWithCorruptBlock")) { 160 UTIL.getConfiguration().setBoolean("hbase.hstore.validate.read_fully", true); 161 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY) 162 .setCompressionType(Compression.Algorithm.GZ).build(); 163 builder.setColumnFamily(familyDescriptor); 164 } 165 this.tableDescriptor = builder.build(); 166 this.r = UTIL.createLocalHRegion(tableDescriptor, null, null); 167 } 168 169 @After 170 public void tearDown() throws Exception { 171 WAL wal = r.getWAL(); 172 this.r.close(); 173 wal.close(); 174 } 175 176 /** 177 * Verify that you can stop a long-running compaction (used during RS shutdown) 178 */ 179 @Test 180 public void testInterruptCompactionBySize() throws Exception { 181 assertEquals(0, count()); 182 183 // lower the polling interval for this test 184 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); 185 186 try { 187 // Create a couple store files w/ 15KB (over 10KB interval) 188 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 189 byte[] pad = new byte[1000]; // 1 KB chunk 190 for (int i = 0; i < compactionThreshold; i++) { 191 Table loader = new RegionAsTable(r); 192 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 193 p.setDurability(Durability.SKIP_WAL); 194 for (int j = 0; j < jmax; j++) { 195 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 196 } 197 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 198 loader.put(p); 199 r.flush(true); 200 } 201 202 HRegion spyR = spy(r); 203 doAnswer(new Answer<Object>() { 204 @Override 205 public Object answer(InvocationOnMock invocation) throws Throwable { 206 r.writestate.writesEnabled = false; 207 return invocation.callRealMethod(); 208 } 209 }).when(spyR).doRegionCompactionPrep(); 210 211 // force a minor compaction, but not before requesting a stop 212 spyR.compactStores(); 213 214 // ensure that the compaction stopped, all old files are intact, 215 HStore s = r.getStore(COLUMN_FAMILY); 216 assertEquals(compactionThreshold, s.getStorefilesCount()); 217 assertTrue(s.getStorefilesSize() > 15 * 1000); 218 // and no new store files persisted past compactStores() 219 // only one empty dir exists in temp dir 220 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 221 assertEquals(1, ls.length); 222 Path storeTempDir = 223 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 224 assertTrue(r.getFilesystem().exists(storeTempDir)); 225 ls = r.getFilesystem().listStatus(storeTempDir); 226 assertEquals(0, ls.length); 227 } finally { 228 // don't mess up future tests 229 r.writestate.writesEnabled = true; 230 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 231 232 // Delete all Store information once done using 233 for (int i = 0; i < compactionThreshold; i++) { 234 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 235 byte[][] famAndQf = { COLUMN_FAMILY, null }; 236 delete.addFamily(famAndQf[0]); 237 r.delete(delete); 238 } 239 r.flush(true); 240 241 // Multiple versions allowed for an entry, so the delete isn't enough 242 // Lower TTL and expire to ensure that all our entries have been wiped 243 final int ttl = 1000; 244 for (HStore store : this.r.stores.values()) { 245 ScanInfo old = store.getScanInfo(); 246 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 247 store.setScanInfo(si); 248 } 249 Thread.sleep(ttl); 250 251 r.compact(true); 252 assertEquals(0, count()); 253 } 254 } 255 256 @Test 257 public void testInterruptCompactionByTime() throws Exception { 258 assertEquals(0, count()); 259 260 // lower the polling interval for this test 261 conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); 262 263 try { 264 // Create a couple store files w/ 15KB (over 10KB interval) 265 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 266 byte[] pad = new byte[1000]; // 1 KB chunk 267 for (int i = 0; i < compactionThreshold; i++) { 268 Table loader = new RegionAsTable(r); 269 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 270 p.setDurability(Durability.SKIP_WAL); 271 for (int j = 0; j < jmax; j++) { 272 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 273 } 274 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 275 loader.put(p); 276 r.flush(true); 277 } 278 279 HRegion spyR = spy(r); 280 doAnswer(new Answer<Object>() { 281 @Override 282 public Object answer(InvocationOnMock invocation) throws Throwable { 283 r.writestate.writesEnabled = false; 284 return invocation.callRealMethod(); 285 } 286 }).when(spyR).doRegionCompactionPrep(); 287 288 // force a minor compaction, but not before requesting a stop 289 spyR.compactStores(); 290 291 // ensure that the compaction stopped, all old files are intact, 292 HStore s = r.getStore(COLUMN_FAMILY); 293 assertEquals(compactionThreshold, s.getStorefilesCount()); 294 assertTrue(s.getStorefilesSize() > 15 * 1000); 295 // and no new store files persisted past compactStores() 296 // only one empty dir exists in temp dir 297 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 298 assertEquals(1, ls.length); 299 Path storeTempDir = 300 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 301 assertTrue(r.getFilesystem().exists(storeTempDir)); 302 ls = r.getFilesystem().listStatus(storeTempDir); 303 assertEquals(0, ls.length); 304 } finally { 305 // don't mess up future tests 306 r.writestate.writesEnabled = true; 307 conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); 308 309 // Delete all Store information once done using 310 for (int i = 0; i < compactionThreshold; i++) { 311 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 312 byte[][] famAndQf = { COLUMN_FAMILY, null }; 313 delete.addFamily(famAndQf[0]); 314 r.delete(delete); 315 } 316 r.flush(true); 317 318 // Multiple versions allowed for an entry, so the delete isn't enough 319 // Lower TTL and expire to ensure that all our entries have been wiped 320 final int ttl = 1000; 321 for (HStore store : this.r.stores.values()) { 322 ScanInfo old = store.getScanInfo(); 323 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 324 store.setScanInfo(si); 325 } 326 Thread.sleep(ttl); 327 328 r.compact(true); 329 assertEquals(0, count()); 330 } 331 } 332 333 private int count() throws IOException { 334 int count = 0; 335 for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { 336 f.initReader(); 337 try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) { 338 scanner.seek(KeyValue.LOWESTKEY); 339 while (scanner.next() != null) { 340 count++; 341 } 342 } 343 } 344 return count; 345 } 346 347 private void createStoreFile(final HRegion region) throws IOException { 348 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 349 } 350 351 private void createStoreFile(final HRegion region, String family) throws IOException { 352 Table loader = new RegionAsTable(region); 353 HTestConst.addContent(loader, family); 354 region.flush(true); 355 } 356 357 @Test 358 public void testCompactionWithCorruptResult() throws Exception { 359 int nfiles = 10; 360 for (int i = 0; i < nfiles; i++) { 361 createStoreFile(r); 362 } 363 HStore store = r.getStore(COLUMN_FAMILY); 364 365 Collection<HStoreFile> storeFiles = store.getStorefiles(); 366 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 367 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 368 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 369 370 // Now lets corrupt the compacted file. 371 FileSystem fs = store.getFileSystem(); 372 // default compaction policy created one and only one new compacted file 373 Path tmpPath = store.getRegionFileSystem().createTempName(); 374 try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 375 stream.writeChars("CORRUPT FILE!!!!"); 376 } 377 378 // The complete compaction should fail and the corrupt file should remain 379 // in the 'tmp' directory; 380 assertThrows(IOException.class, () -> store.doCompaction(null, null, null, 381 EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath))); 382 assertTrue(fs.exists(tmpPath)); 383 } 384 385 /** 386 * This test uses a hand-modified HFile, which is loaded in from the resources' path. That file 387 * was generated from the test support code in this class and then edited to corrupt the 388 * GZ-encoded block by zeroing-out the first two bytes of the GZip header, the "standard 389 * declaration" of {@code 1f 8b}, found at offset 33 in the file. I'm not sure why, but it seems 390 * that in this test context we do not enforce CRC checksums. Thus, this corruption manifests in 391 * the Decompressor rather than in the reader when it loads the block bytes and compares vs. the 392 * header. 393 */ 394 @Test 395 public void testCompactionWithCorruptBlock() throws Exception { 396 createStoreFile(r, Bytes.toString(FAMILY)); 397 createStoreFile(r, Bytes.toString(FAMILY)); 398 HStore store = r.getStore(FAMILY); 399 400 Collection<HStoreFile> storeFiles = store.getStorefiles(); 401 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 402 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 403 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 404 405 // insert the hfile with a corrupted data block into the region's tmp directory, where 406 // compaction output is collected. 407 FileSystem fs = store.getFileSystem(); 408 Path tmpPath = store.getRegionFileSystem().createTempName(); 409 try ( 410 InputStream inputStream = 411 getClass().getResourceAsStream("TestCompaction_HFileWithCorruptBlock.gz"); 412 GZIPInputStream gzipInputStream = new GZIPInputStream(Objects.requireNonNull(inputStream)); 413 OutputStream outputStream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 414 assertThat(gzipInputStream, notNullValue()); 415 assertThat(outputStream, notNullValue()); 416 IOUtils.copyBytes(gzipInputStream, outputStream, 512); 417 } 418 LoggerFactory.getLogger(TestCompaction.class).info("Wrote corrupted HFile to {}", tmpPath); 419 420 // The complete compaction should fail and the corrupt file should remain 421 // in the 'tmp' directory; 422 try { 423 store.doCompaction(request, storeFiles, null, EnvironmentEdgeManager.currentTime(), 424 Collections.singletonList(tmpPath)); 425 } catch (IOException e) { 426 Throwable rootCause = e; 427 while (rootCause.getCause() != null) { 428 rootCause = rootCause.getCause(); 429 } 430 assertThat(rootCause, allOf(instanceOf(IOException.class), 431 hasProperty("message", containsString("not a gzip file")))); 432 assertTrue(fs.exists(tmpPath)); 433 return; 434 } 435 fail("Compaction should have failed due to corrupt block"); 436 } 437 438 /** 439 * Create a custom compaction request and be sure that we can track it through the queue, knowing 440 * when the compaction is completed. 441 */ 442 @Test 443 public void testTrackingCompactionRequest() throws Exception { 444 // setup a compact/split thread on a mock server 445 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 446 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 447 CompactSplit thread = new CompactSplit(mockServer); 448 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 449 450 // setup a region/store with some files 451 HStore store = r.getStore(COLUMN_FAMILY); 452 createStoreFile(r); 453 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 454 createStoreFile(r); 455 } 456 457 CountDownLatch latch = new CountDownLatch(1); 458 Tracker tracker = new Tracker(latch); 459 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null); 460 // wait for the latch to complete. 461 latch.await(); 462 463 thread.interruptIfNecessary(); 464 } 465 466 @Test 467 public void testCompactionFailure() throws Exception { 468 // setup a compact/split thread on a mock server 469 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 470 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 471 CompactSplit thread = new CompactSplit(mockServer); 472 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 473 474 // setup a region/store with some files 475 HStore store = r.getStore(COLUMN_FAMILY); 476 createStoreFile(r); 477 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 478 createStoreFile(r); 479 } 480 481 HRegion mockRegion = Mockito.spy(r); 482 Mockito.when(mockRegion.checkSplit()) 483 .thenThrow(new RuntimeException("Thrown intentionally by test!")); 484 485 try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) { 486 487 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 488 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 489 490 CountDownLatch latch = new CountDownLatch(1); 491 Tracker tracker = new Tracker(latch); 492 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker, 493 null); 494 // wait for the latch to complete. 495 latch.await(120, TimeUnit.SECONDS); 496 497 // compaction should have completed and been marked as failed due to error in split request 498 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 499 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 500 501 assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" 502 + postCompletedCount + ")", postCompletedCount > preCompletedCount); 503 assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" 504 + postFailedCount + ")", postFailedCount > preFailedCount); 505 } 506 } 507 508 /** 509 * Test no new Compaction requests are generated after calling stop compactions 510 */ 511 @Test 512 public void testStopStartCompaction() throws IOException { 513 // setup a compact/split thread on a mock server 514 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 515 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 516 final CompactSplit thread = new CompactSplit(mockServer); 517 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 518 // setup a region/store with some files 519 HStore store = r.getStore(COLUMN_FAMILY); 520 createStoreFile(r); 521 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 522 createStoreFile(r); 523 } 524 thread.switchCompaction(false); 525 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 526 CompactionLifeCycleTracker.DUMMY, null); 527 assertFalse(thread.isCompactionsEnabled()); 528 int longCompactions = thread.getLongCompactions().getActiveCount(); 529 int shortCompactions = thread.getShortCompactions().getActiveCount(); 530 assertEquals( 531 "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0, 532 longCompactions + shortCompactions); 533 thread.switchCompaction(true); 534 assertTrue(thread.isCompactionsEnabled()); 535 // Make sure no compactions have run. 536 assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() 537 + thread.getShortCompactions().getCompletedTaskCount()); 538 // Request a compaction and make sure it is submitted successfully. 539 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 540 CompactionLifeCycleTracker.DUMMY, null); 541 // Wait until the compaction finishes. 542 Waiter.waitFor(UTIL.getConfiguration(), 5000, 543 (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() 544 + thread.getShortCompactions().getCompletedTaskCount() == 1); 545 // Make sure there are no compactions running. 546 assertEquals(0, 547 thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount()); 548 } 549 550 @Test 551 public void testInterruptingRunningCompactions() throws Exception { 552 // setup a compact/split thread on a mock server 553 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 554 WaitThroughPutController.class.getName()); 555 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 556 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 557 CompactSplit thread = new CompactSplit(mockServer); 558 559 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 560 561 // setup a region/store with some files 562 HStore store = r.getStore(COLUMN_FAMILY); 563 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 564 byte[] pad = new byte[1000]; // 1 KB chunk 565 for (int i = 0; i < compactionThreshold; i++) { 566 Table loader = new RegionAsTable(r); 567 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 568 p.setDurability(Durability.SKIP_WAL); 569 for (int j = 0; j < jmax; j++) { 570 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 571 } 572 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 573 loader.put(p); 574 r.flush(true); 575 } 576 HStore s = r.getStore(COLUMN_FAMILY); 577 int initialFiles = s.getStorefilesCount(); 578 579 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 580 CompactionLifeCycleTracker.DUMMY, null); 581 582 Thread.sleep(3000); 583 thread.switchCompaction(false); 584 assertEquals(initialFiles, s.getStorefilesCount()); 585 // don't mess up future tests 586 thread.switchCompaction(true); 587 } 588 589 /** 590 * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit} 591 * @throws Exception on failure 592 */ 593 @Test 594 public void testMultipleCustomCompactionRequests() throws Exception { 595 // setup a compact/split thread on a mock server 596 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 597 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 598 CompactSplit thread = new CompactSplit(mockServer); 599 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 600 601 // setup a region/store with some files 602 int numStores = r.getStores().size(); 603 CountDownLatch latch = new CountDownLatch(numStores); 604 Tracker tracker = new Tracker(latch); 605 // create some store files and setup requests for each store on which we want to do a 606 // compaction 607 for (HStore store : r.getStores()) { 608 createStoreFile(r, store.getColumnFamilyName()); 609 createStoreFile(r, store.getColumnFamilyName()); 610 createStoreFile(r, store.getColumnFamilyName()); 611 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker, 612 null); 613 } 614 // wait for the latch to complete. 615 latch.await(); 616 617 thread.interruptIfNecessary(); 618 } 619 620 class StoreMockMaker extends StatefulStoreMockMaker { 621 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 622 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 623 private final ArrayList<Integer> results; 624 625 public StoreMockMaker(ArrayList<Integer> results) { 626 this.results = results; 627 } 628 629 public class TestCompactionContext extends CompactionContext { 630 631 private List<HStoreFile> selectedFiles; 632 633 public TestCompactionContext(List<HStoreFile> selectedFiles) { 634 super(); 635 this.selectedFiles = selectedFiles; 636 } 637 638 @Override 639 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 640 return new ArrayList<>(); 641 } 642 643 @Override 644 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 645 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 646 this.request = new CompactionRequestImpl(selectedFiles); 647 this.request.setPriority(getPriority()); 648 return true; 649 } 650 651 @Override 652 public List<Path> compact(ThroughputController throughputController, User user) 653 throws IOException { 654 finishCompaction(this.selectedFiles); 655 return new ArrayList<>(); 656 } 657 } 658 659 @Override 660 public synchronized Optional<CompactionContext> selectCompaction() { 661 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 662 compacting.addAll(notCompacting); 663 notCompacting.clear(); 664 try { 665 ctx.select(null, false, false, false); 666 } catch (IOException ex) { 667 fail("Shouldn't happen"); 668 } 669 return Optional.of(ctx); 670 } 671 672 @Override 673 public synchronized void cancelCompaction(Object object) { 674 TestCompactionContext ctx = (TestCompactionContext) object; 675 compacting.removeAll(ctx.selectedFiles); 676 notCompacting.addAll(ctx.selectedFiles); 677 } 678 679 public synchronized void finishCompaction(List<HStoreFile> sfs) { 680 if (sfs.isEmpty()) return; 681 synchronized (results) { 682 results.add(sfs.size()); 683 } 684 compacting.removeAll(sfs); 685 } 686 687 @Override 688 public int getPriority() { 689 return 7 - compacting.size() - notCompacting.size(); 690 } 691 } 692 693 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 694 BlockingCompactionContext blocked = null; 695 696 public class BlockingCompactionContext extends CompactionContext { 697 public volatile boolean isInCompact = false; 698 699 public void unblock() { 700 synchronized (this) { 701 this.notifyAll(); 702 } 703 } 704 705 @Override 706 public List<Path> compact(ThroughputController throughputController, User user) 707 throws IOException { 708 try { 709 isInCompact = true; 710 synchronized (this) { 711 this.wait(); 712 } 713 } catch (InterruptedException e) { 714 Assume.assumeNoException(e); 715 } 716 return new ArrayList<>(); 717 } 718 719 @Override 720 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 721 return new ArrayList<>(); 722 } 723 724 @Override 725 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 726 throws IOException { 727 this.request = new CompactionRequestImpl(new ArrayList<>()); 728 return true; 729 } 730 } 731 732 @Override 733 public Optional<CompactionContext> selectCompaction() { 734 this.blocked = new BlockingCompactionContext(); 735 try { 736 this.blocked.select(null, false, false, false); 737 } catch (IOException ex) { 738 fail("Shouldn't happen"); 739 } 740 return Optional.of(blocked); 741 } 742 743 @Override 744 public void cancelCompaction(Object object) { 745 } 746 747 @Override 748 public int getPriority() { 749 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 750 } 751 752 public BlockingCompactionContext waitForBlocking() { 753 while (this.blocked == null || !this.blocked.isInCompact) { 754 Threads.sleepWithoutInterrupt(50); 755 } 756 BlockingCompactionContext ctx = this.blocked; 757 this.blocked = null; 758 return ctx; 759 } 760 761 @Override 762 public HStore createStoreMock(String name) throws Exception { 763 return createStoreMock(Integer.MIN_VALUE, name); 764 } 765 766 public HStore createStoreMock(int priority, String name) throws Exception { 767 // Override the mock to always return the specified priority. 768 HStore s = super.createStoreMock(name); 769 when(s.getCompactPriority()).thenReturn(priority); 770 return s; 771 } 772 } 773 774 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 775 @Test 776 public void testCompactionQueuePriorities() throws Exception { 777 // Setup a compact/split thread on a mock server. 778 final Configuration conf = HBaseConfiguration.create(); 779 HRegionServer mockServer = mock(HRegionServer.class); 780 when(mockServer.isStopped()).thenReturn(false); 781 when(mockServer.getConfiguration()).thenReturn(conf); 782 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 783 CompactSplit cst = new CompactSplit(mockServer); 784 when(mockServer.getCompactSplitThread()).thenReturn(cst); 785 // prevent large compaction thread pool stealing job from small compaction queue. 786 cst.shutdownLongCompactions(); 787 // Set up the region mock that redirects compactions. 788 HRegion r = mock(HRegion.class); 789 when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 790 @Override 791 public Boolean answer(InvocationOnMock invocation) throws Throwable { 792 invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null); 793 return true; 794 } 795 }); 796 797 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 798 ArrayList<Integer> results = new ArrayList<>(); 799 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 800 HStore store = sm.createStoreMock("store1"); 801 HStore store2 = sm2.createStoreMock("store2"); 802 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 803 804 // First, block the compaction thread so that we could muck with queue. 805 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 806 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 807 808 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 809 for (int i = 0; i < 4; ++i) { 810 sm.notCompacting.add(createFile()); 811 } 812 cst.requestSystemCompaction(r, store, "s1-pri3"); 813 for (int i = 0; i < 3; ++i) { 814 sm2.notCompacting.add(createFile()); 815 } 816 cst.requestSystemCompaction(r, store2, "s2-pri4"); 817 // Now add 2 more files to store1 and queue compaction - pri 1. 818 for (int i = 0; i < 2; ++i) { 819 sm.notCompacting.add(createFile()); 820 } 821 cst.requestSystemCompaction(r, store, "s1-pri1"); 822 // Finally add blocking compaction with priority 2. 823 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 824 825 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 826 currentBlock.unblock(); 827 currentBlock = blocker.waitForBlocking(); 828 // Pri1 should have "compacted" all 6 files. 829 assertEquals(1, results.size()); 830 assertEquals(6, results.get(0).intValue()); 831 // Add 2 files to store 1 (it has 2 files now). 832 for (int i = 0; i < 2; ++i) { 833 sm.notCompacting.add(createFile()); 834 } 835 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 836 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 837 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 838 currentBlock.unblock(); 839 currentBlock = blocker.waitForBlocking(); 840 assertEquals(3, results.size()); 841 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 842 assertEquals(2, results.get(2).intValue()); 843 844 currentBlock.unblock(); 845 cst.interruptIfNecessary(); 846 } 847 848 /** 849 * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then 850 * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The 851 * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time 852 * stamp but different sequence id, and will get scanned successively during compaction. 853 * <p/> 854 * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set 855 * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out 856 * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause 857 * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test 858 */ 859 @Test 860 public void testCompactionSeqId() throws Exception { 861 final byte[] ROW = Bytes.toBytes("row"); 862 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 863 864 long timestamp = 10000; 865 866 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 867 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 868 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 869 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 870 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 871 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 872 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 873 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 874 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 875 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 876 for (int i = 0; i < 10; i++) { 877 Put put = new Put(ROW); 878 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 879 r.put(put); 880 } 881 r.flush(true); 882 883 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 884 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 885 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 886 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 887 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 888 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 889 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 890 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 891 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 892 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 893 for (int i = 18; i > 8; i--) { 894 Put put = new Put(ROW); 895 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 896 r.put(put); 897 } 898 r.flush(true); 899 r.compact(true); 900 } 901 902 public static class DummyCompactor extends DefaultCompactor { 903 public DummyCompactor(Configuration conf, HStore store) { 904 super(conf, store); 905 this.keepSeqIdPeriod = 0; 906 } 907 } 908 909 private static HStoreFile createFile() throws Exception { 910 HStoreFile sf = mock(HStoreFile.class); 911 when(sf.getPath()).thenReturn(new Path("file")); 912 StoreFileReader r = mock(StoreFileReader.class); 913 when(r.length()).thenReturn(10L); 914 when(sf.getReader()).thenReturn(r); 915 return sf; 916 } 917 918 /** 919 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 920 * finishes. 921 */ 922 public static class Tracker implements CompactionLifeCycleTracker { 923 924 private final CountDownLatch done; 925 926 public Tracker(CountDownLatch done) { 927 this.done = done; 928 } 929 930 @Override 931 public void afterExecution(Store store) { 932 done.countDown(); 933 } 934 } 935 936 /** 937 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 938 * finishes. 939 */ 940 public static class WaitThroughPutController extends NoLimitThroughputController { 941 942 public WaitThroughPutController() { 943 } 944 945 @Override 946 public long control(String compactionName, long size) throws InterruptedException { 947 Thread.sleep(6000000); 948 return 6000000; 949 } 950 } 951}