001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; 025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; 026import static org.junit.Assert.assertEquals; 027import static org.junit.Assert.assertFalse; 028import static org.junit.Assert.assertThrows; 029import static org.junit.Assert.assertTrue; 030import static org.junit.Assert.fail; 031import static org.mockito.ArgumentMatchers.any; 032import static org.mockito.Mockito.doAnswer; 033import static org.mockito.Mockito.mock; 034import static org.mockito.Mockito.spy; 035import static org.mockito.Mockito.when; 036 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.Collection; 040import java.util.Collections; 041import java.util.List; 042import java.util.Optional; 043import java.util.concurrent.CountDownLatch; 044import java.util.concurrent.TimeUnit; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FSDataOutputStream; 047import org.apache.hadoop.fs.FileStatus; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.ChoreService; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtility; 054import org.apache.hadoop.hbase.HColumnDescriptor; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.HTableDescriptor; 057import org.apache.hadoop.hbase.HTestConst; 058import org.apache.hadoop.hbase.Waiter; 059import org.apache.hadoop.hbase.client.Delete; 060import org.apache.hadoop.hbase.client.Durability; 061import org.apache.hadoop.hbase.client.Put; 062import org.apache.hadoop.hbase.client.Table; 063import org.apache.hadoop.hbase.io.hfile.HFileScanner; 064import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 065import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 066import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 067import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 068import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 069import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 070import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 071import org.apache.hadoop.hbase.security.User; 072import org.apache.hadoop.hbase.testclassification.MediumTests; 073import org.apache.hadoop.hbase.testclassification.RegionServerTests; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.apache.hadoop.hbase.util.Threads; 077import org.apache.hadoop.hbase.wal.WAL; 078import org.junit.After; 079import org.junit.Assume; 080import org.junit.Before; 081import org.junit.ClassRule; 082import org.junit.Rule; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.junit.rules.TestName; 086import org.mockito.Mockito; 087import org.mockito.invocation.InvocationOnMock; 088import org.mockito.stubbing.Answer; 089 090/** 091 * Test compaction framework and common functions 092 */ 093@Category({ RegionServerTests.class, MediumTests.class }) 094public class TestCompaction { 095 096 @ClassRule 097 public static final HBaseClassTestRule CLASS_RULE = 098 HBaseClassTestRule.forClass(TestCompaction.class); 099 100 @Rule 101 public TestName name = new TestName(); 102 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 103 protected Configuration conf = UTIL.getConfiguration(); 104 105 private HRegion r = null; 106 private HTableDescriptor htd = null; 107 private static final byte[] COLUMN_FAMILY = fam1; 108 private final byte[] STARTROW = Bytes.toBytes(START_KEY); 109 private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 110 private int compactionThreshold; 111 private byte[] secondRowBytes, thirdRowBytes; 112 private static final long MAX_FILES_TO_COMPACT = 10; 113 private final byte[] FAMILY = Bytes.toBytes("cf"); 114 115 /** constructor */ 116 public TestCompaction() { 117 super(); 118 119 // Set cache flush size to 1MB 120 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 121 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 122 conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L); 123 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 124 NoLimitThroughputController.class.getName()); 125 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 126 127 secondRowBytes = START_KEY_BYTES.clone(); 128 // Increment the least significant character so we get to next row. 129 secondRowBytes[START_KEY_BYTES.length - 1]++; 130 thirdRowBytes = START_KEY_BYTES.clone(); 131 thirdRowBytes[START_KEY_BYTES.length - 1] = 132 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 133 } 134 135 @Before 136 public void setUp() throws Exception { 137 this.htd = UTIL.createTableDescriptor(name.getMethodName()); 138 if (name.getMethodName().equals("testCompactionSeqId")) { 139 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 140 UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 141 DummyCompactor.class.getName()); 142 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); 143 hcd.setMaxVersions(65536); 144 this.htd.addFamily(hcd); 145 } 146 this.r = UTIL.createLocalHRegion(htd, null, null); 147 } 148 149 @After 150 public void tearDown() throws Exception { 151 WAL wal = r.getWAL(); 152 this.r.close(); 153 wal.close(); 154 } 155 156 /** 157 * Verify that you can stop a long-running compaction (used during RS shutdown) 158 */ 159 @Test 160 public void testInterruptCompactionBySize() throws Exception { 161 assertEquals(0, count()); 162 163 // lower the polling interval for this test 164 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); 165 166 try { 167 // Create a couple store files w/ 15KB (over 10KB interval) 168 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 169 byte[] pad = new byte[1000]; // 1 KB chunk 170 for (int i = 0; i < compactionThreshold; i++) { 171 Table loader = new RegionAsTable(r); 172 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 173 p.setDurability(Durability.SKIP_WAL); 174 for (int j = 0; j < jmax; j++) { 175 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 176 } 177 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 178 loader.put(p); 179 r.flush(true); 180 } 181 182 HRegion spyR = spy(r); 183 doAnswer(new Answer<Object>() { 184 @Override 185 public Object answer(InvocationOnMock invocation) throws Throwable { 186 r.writestate.writesEnabled = false; 187 return invocation.callRealMethod(); 188 } 189 }).when(spyR).doRegionCompactionPrep(); 190 191 // force a minor compaction, but not before requesting a stop 192 spyR.compactStores(); 193 194 // ensure that the compaction stopped, all old files are intact, 195 HStore s = r.getStore(COLUMN_FAMILY); 196 assertEquals(compactionThreshold, s.getStorefilesCount()); 197 assertTrue(s.getStorefilesSize() > 15 * 1000); 198 // and no new store files persisted past compactStores() 199 // only one empty dir exists in temp dir 200 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 201 assertEquals(1, ls.length); 202 Path storeTempDir = 203 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 204 assertTrue(r.getFilesystem().exists(storeTempDir)); 205 ls = r.getFilesystem().listStatus(storeTempDir); 206 assertEquals(0, ls.length); 207 } finally { 208 // don't mess up future tests 209 r.writestate.writesEnabled = true; 210 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 211 212 // Delete all Store information once done using 213 for (int i = 0; i < compactionThreshold; i++) { 214 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 215 byte[][] famAndQf = { COLUMN_FAMILY, null }; 216 delete.addFamily(famAndQf[0]); 217 r.delete(delete); 218 } 219 r.flush(true); 220 221 // Multiple versions allowed for an entry, so the delete isn't enough 222 // Lower TTL and expire to ensure that all our entries have been wiped 223 final int ttl = 1000; 224 for (HStore store : this.r.stores.values()) { 225 ScanInfo old = store.getScanInfo(); 226 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 227 store.setScanInfo(si); 228 } 229 Thread.sleep(ttl); 230 231 r.compact(true); 232 assertEquals(0, count()); 233 } 234 } 235 236 @Test 237 public void testInterruptCompactionByTime() throws Exception { 238 assertEquals(0, count()); 239 240 // lower the polling interval for this test 241 conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); 242 243 try { 244 // Create a couple store files w/ 15KB (over 10KB interval) 245 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 246 byte[] pad = new byte[1000]; // 1 KB chunk 247 for (int i = 0; i < compactionThreshold; i++) { 248 Table loader = new RegionAsTable(r); 249 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 250 p.setDurability(Durability.SKIP_WAL); 251 for (int j = 0; j < jmax; j++) { 252 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 253 } 254 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 255 loader.put(p); 256 r.flush(true); 257 } 258 259 HRegion spyR = spy(r); 260 doAnswer(new Answer() { 261 @Override 262 public Object answer(InvocationOnMock invocation) throws Throwable { 263 r.writestate.writesEnabled = false; 264 return invocation.callRealMethod(); 265 } 266 }).when(spyR).doRegionCompactionPrep(); 267 268 // force a minor compaction, but not before requesting a stop 269 spyR.compactStores(); 270 271 // ensure that the compaction stopped, all old files are intact, 272 HStore s = r.getStore(COLUMN_FAMILY); 273 assertEquals(compactionThreshold, s.getStorefilesCount()); 274 assertTrue(s.getStorefilesSize() > 15 * 1000); 275 // and no new store files persisted past compactStores() 276 // only one empty dir exists in temp dir 277 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 278 assertEquals(1, ls.length); 279 Path storeTempDir = 280 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 281 assertTrue(r.getFilesystem().exists(storeTempDir)); 282 ls = r.getFilesystem().listStatus(storeTempDir); 283 assertEquals(0, ls.length); 284 } finally { 285 // don't mess up future tests 286 r.writestate.writesEnabled = true; 287 conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); 288 289 // Delete all Store information once done using 290 for (int i = 0; i < compactionThreshold; i++) { 291 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 292 byte[][] famAndQf = { COLUMN_FAMILY, null }; 293 delete.addFamily(famAndQf[0]); 294 r.delete(delete); 295 } 296 r.flush(true); 297 298 // Multiple versions allowed for an entry, so the delete isn't enough 299 // Lower TTL and expire to ensure that all our entries have been wiped 300 final int ttl = 1000; 301 for (HStore store : this.r.stores.values()) { 302 ScanInfo old = store.getScanInfo(); 303 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 304 store.setScanInfo(si); 305 } 306 Thread.sleep(ttl); 307 308 r.compact(true); 309 assertEquals(0, count()); 310 } 311 } 312 313 private int count() throws IOException { 314 int count = 0; 315 for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { 316 HFileScanner scanner = f.getReader().getScanner(false, false); 317 if (!scanner.seekTo()) { 318 continue; 319 } 320 do { 321 count++; 322 } while (scanner.next()); 323 } 324 return count; 325 } 326 327 private void createStoreFile(final HRegion region) throws IOException { 328 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 329 } 330 331 private void createStoreFile(final HRegion region, String family) throws IOException { 332 Table loader = new RegionAsTable(region); 333 HTestConst.addContent(loader, family); 334 region.flush(true); 335 } 336 337 @Test 338 public void testCompactionWithCorruptResult() throws Exception { 339 int nfiles = 10; 340 for (int i = 0; i < nfiles; i++) { 341 createStoreFile(r); 342 } 343 HStore store = r.getStore(COLUMN_FAMILY); 344 345 Collection<HStoreFile> storeFiles = store.getStorefiles(); 346 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 347 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 348 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 349 350 // Now lets corrupt the compacted file. 351 FileSystem fs = store.getFileSystem(); 352 // default compaction policy created one and only one new compacted file 353 Path tmpPath = store.getRegionFileSystem().createTempName(); 354 try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 355 stream.writeChars("CORRUPT FILE!!!!"); 356 } 357 // The complete compaction should fail and the corrupt file should remain 358 // in the 'tmp' directory; 359 assertThrows(IOException.class, () -> store.doCompaction(null, null, null, 360 EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath))); 361 assertTrue(fs.exists(tmpPath)); 362 } 363 364 /** 365 * Create a custom compaction request and be sure that we can track it through the queue, knowing 366 * when the compaction is completed. 367 */ 368 @Test 369 public void testTrackingCompactionRequest() throws Exception { 370 // setup a compact/split thread on a mock server 371 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 372 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 373 CompactSplit thread = new CompactSplit(mockServer); 374 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 375 376 // setup a region/store with some files 377 HStore store = r.getStore(COLUMN_FAMILY); 378 createStoreFile(r); 379 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 380 createStoreFile(r); 381 } 382 383 CountDownLatch latch = new CountDownLatch(1); 384 Tracker tracker = new Tracker(latch); 385 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null); 386 // wait for the latch to complete. 387 latch.await(); 388 389 thread.interruptIfNecessary(); 390 } 391 392 @Test 393 public void testCompactionFailure() throws Exception { 394 // setup a compact/split thread on a mock server 395 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 396 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 397 CompactSplit thread = new CompactSplit(mockServer); 398 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 399 400 // setup a region/store with some files 401 HStore store = r.getStore(COLUMN_FAMILY); 402 createStoreFile(r); 403 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 404 createStoreFile(r); 405 } 406 407 HRegion mockRegion = Mockito.spy(r); 408 Mockito.when(mockRegion.checkSplit()) 409 .thenThrow(new RuntimeException("Thrown intentionally by test!")); 410 411 try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) { 412 413 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 414 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 415 416 CountDownLatch latch = new CountDownLatch(1); 417 Tracker tracker = new Tracker(latch); 418 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker, 419 null); 420 // wait for the latch to complete. 421 latch.await(120, TimeUnit.SECONDS); 422 423 // compaction should have completed and been marked as failed due to error in split request 424 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 425 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 426 427 assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" 428 + postCompletedCount + ")", postCompletedCount > preCompletedCount); 429 assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" 430 + postFailedCount + ")", postFailedCount > preFailedCount); 431 } 432 } 433 434 /** 435 * Test no new Compaction requests are generated after calling stop compactions 436 */ 437 @Test 438 public void testStopStartCompaction() throws IOException { 439 // setup a compact/split thread on a mock server 440 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 441 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 442 final CompactSplit thread = new CompactSplit(mockServer); 443 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 444 // setup a region/store with some files 445 HStore store = r.getStore(COLUMN_FAMILY); 446 createStoreFile(r); 447 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 448 createStoreFile(r); 449 } 450 thread.switchCompaction(false); 451 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 452 CompactionLifeCycleTracker.DUMMY, null); 453 assertFalse(thread.isCompactionsEnabled()); 454 int longCompactions = thread.getLongCompactions().getActiveCount(); 455 int shortCompactions = thread.getShortCompactions().getActiveCount(); 456 assertEquals( 457 "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0, 458 longCompactions + shortCompactions); 459 thread.switchCompaction(true); 460 assertTrue(thread.isCompactionsEnabled()); 461 // Make sure no compactions have run. 462 assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() 463 + thread.getShortCompactions().getCompletedTaskCount()); 464 // Request a compaction and make sure it is submitted successfully. 465 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 466 CompactionLifeCycleTracker.DUMMY, null); 467 // Wait until the compaction finishes. 468 Waiter.waitFor(UTIL.getConfiguration(), 5000, 469 (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() 470 + thread.getShortCompactions().getCompletedTaskCount() == 1); 471 // Make sure there are no compactions running. 472 assertEquals(0, 473 thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount()); 474 } 475 476 @Test 477 public void testInterruptingRunningCompactions() throws Exception { 478 // setup a compact/split thread on a mock server 479 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 480 WaitThroughPutController.class.getName()); 481 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 482 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 483 CompactSplit thread = new CompactSplit(mockServer); 484 485 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 486 487 // setup a region/store with some files 488 HStore store = r.getStore(COLUMN_FAMILY); 489 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 490 byte[] pad = new byte[1000]; // 1 KB chunk 491 for (int i = 0; i < compactionThreshold; i++) { 492 Table loader = new RegionAsTable(r); 493 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 494 p.setDurability(Durability.SKIP_WAL); 495 for (int j = 0; j < jmax; j++) { 496 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 497 } 498 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 499 loader.put(p); 500 r.flush(true); 501 } 502 HStore s = r.getStore(COLUMN_FAMILY); 503 int initialFiles = s.getStorefilesCount(); 504 505 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 506 CompactionLifeCycleTracker.DUMMY, null); 507 508 Thread.sleep(3000); 509 thread.switchCompaction(false); 510 assertEquals(initialFiles, s.getStorefilesCount()); 511 // don't mess up future tests 512 thread.switchCompaction(true); 513 } 514 515 /** 516 * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit} 517 * @throws Exception on failure 518 */ 519 @Test 520 public void testMultipleCustomCompactionRequests() throws Exception { 521 // setup a compact/split thread on a mock server 522 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 523 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 524 CompactSplit thread = new CompactSplit(mockServer); 525 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 526 527 // setup a region/store with some files 528 int numStores = r.getStores().size(); 529 CountDownLatch latch = new CountDownLatch(numStores); 530 Tracker tracker = new Tracker(latch); 531 // create some store files and setup requests for each store on which we want to do a 532 // compaction 533 for (HStore store : r.getStores()) { 534 createStoreFile(r, store.getColumnFamilyName()); 535 createStoreFile(r, store.getColumnFamilyName()); 536 createStoreFile(r, store.getColumnFamilyName()); 537 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker, 538 null); 539 } 540 // wait for the latch to complete. 541 latch.await(); 542 543 thread.interruptIfNecessary(); 544 } 545 546 class StoreMockMaker extends StatefulStoreMockMaker { 547 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 548 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 549 private final ArrayList<Integer> results; 550 551 public StoreMockMaker(ArrayList<Integer> results) { 552 this.results = results; 553 } 554 555 public class TestCompactionContext extends CompactionContext { 556 557 private List<HStoreFile> selectedFiles; 558 559 public TestCompactionContext(List<HStoreFile> selectedFiles) { 560 super(); 561 this.selectedFiles = selectedFiles; 562 } 563 564 @Override 565 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 566 return new ArrayList<>(); 567 } 568 569 @Override 570 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 571 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 572 this.request = new CompactionRequestImpl(selectedFiles); 573 this.request.setPriority(getPriority()); 574 return true; 575 } 576 577 @Override 578 public List<Path> compact(ThroughputController throughputController, User user) 579 throws IOException { 580 finishCompaction(this.selectedFiles); 581 return new ArrayList<>(); 582 } 583 } 584 585 @Override 586 public synchronized Optional<CompactionContext> selectCompaction() { 587 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 588 compacting.addAll(notCompacting); 589 notCompacting.clear(); 590 try { 591 ctx.select(null, false, false, false); 592 } catch (IOException ex) { 593 fail("Shouldn't happen"); 594 } 595 return Optional.of(ctx); 596 } 597 598 @Override 599 public synchronized void cancelCompaction(Object object) { 600 TestCompactionContext ctx = (TestCompactionContext) object; 601 compacting.removeAll(ctx.selectedFiles); 602 notCompacting.addAll(ctx.selectedFiles); 603 } 604 605 public synchronized void finishCompaction(List<HStoreFile> sfs) { 606 if (sfs.isEmpty()) return; 607 synchronized (results) { 608 results.add(sfs.size()); 609 } 610 compacting.removeAll(sfs); 611 } 612 613 @Override 614 public int getPriority() { 615 return 7 - compacting.size() - notCompacting.size(); 616 } 617 } 618 619 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 620 BlockingCompactionContext blocked = null; 621 622 public class BlockingCompactionContext extends CompactionContext { 623 public volatile boolean isInCompact = false; 624 625 public void unblock() { 626 synchronized (this) { 627 this.notifyAll(); 628 } 629 } 630 631 @Override 632 public List<Path> compact(ThroughputController throughputController, User user) 633 throws IOException { 634 try { 635 isInCompact = true; 636 synchronized (this) { 637 this.wait(); 638 } 639 } catch (InterruptedException e) { 640 Assume.assumeNoException(e); 641 } 642 return new ArrayList<>(); 643 } 644 645 @Override 646 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 647 return new ArrayList<>(); 648 } 649 650 @Override 651 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 652 throws IOException { 653 this.request = new CompactionRequestImpl(new ArrayList<>()); 654 return true; 655 } 656 } 657 658 @Override 659 public Optional<CompactionContext> selectCompaction() { 660 this.blocked = new BlockingCompactionContext(); 661 try { 662 this.blocked.select(null, false, false, false); 663 } catch (IOException ex) { 664 fail("Shouldn't happen"); 665 } 666 return Optional.of(blocked); 667 } 668 669 @Override 670 public void cancelCompaction(Object object) { 671 } 672 673 @Override 674 public int getPriority() { 675 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 676 } 677 678 public BlockingCompactionContext waitForBlocking() { 679 while (this.blocked == null || !this.blocked.isInCompact) { 680 Threads.sleepWithoutInterrupt(50); 681 } 682 BlockingCompactionContext ctx = this.blocked; 683 this.blocked = null; 684 return ctx; 685 } 686 687 @Override 688 public HStore createStoreMock(String name) throws Exception { 689 return createStoreMock(Integer.MIN_VALUE, name); 690 } 691 692 public HStore createStoreMock(int priority, String name) throws Exception { 693 // Override the mock to always return the specified priority. 694 HStore s = super.createStoreMock(name); 695 when(s.getCompactPriority()).thenReturn(priority); 696 return s; 697 } 698 } 699 700 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 701 @Test 702 public void testCompactionQueuePriorities() throws Exception { 703 // Setup a compact/split thread on a mock server. 704 final Configuration conf = HBaseConfiguration.create(); 705 HRegionServer mockServer = mock(HRegionServer.class); 706 when(mockServer.isStopped()).thenReturn(false); 707 when(mockServer.getConfiguration()).thenReturn(conf); 708 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 709 CompactSplit cst = new CompactSplit(mockServer); 710 when(mockServer.getCompactSplitThread()).thenReturn(cst); 711 // prevent large compaction thread pool stealing job from small compaction queue. 712 cst.shutdownLongCompactions(); 713 // Set up the region mock that redirects compactions. 714 HRegion r = mock(HRegion.class); 715 when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 716 @Override 717 public Boolean answer(InvocationOnMock invocation) throws Throwable { 718 invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null); 719 return true; 720 } 721 }); 722 723 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 724 ArrayList<Integer> results = new ArrayList<>(); 725 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 726 HStore store = sm.createStoreMock("store1"); 727 HStore store2 = sm2.createStoreMock("store2"); 728 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 729 730 // First, block the compaction thread so that we could muck with queue. 731 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 732 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 733 734 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 735 for (int i = 0; i < 4; ++i) { 736 sm.notCompacting.add(createFile()); 737 } 738 cst.requestSystemCompaction(r, store, "s1-pri3"); 739 for (int i = 0; i < 3; ++i) { 740 sm2.notCompacting.add(createFile()); 741 } 742 cst.requestSystemCompaction(r, store2, "s2-pri4"); 743 // Now add 2 more files to store1 and queue compaction - pri 1. 744 for (int i = 0; i < 2; ++i) { 745 sm.notCompacting.add(createFile()); 746 } 747 cst.requestSystemCompaction(r, store, "s1-pri1"); 748 // Finally add blocking compaction with priority 2. 749 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 750 751 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 752 currentBlock.unblock(); 753 currentBlock = blocker.waitForBlocking(); 754 // Pri1 should have "compacted" all 6 files. 755 assertEquals(1, results.size()); 756 assertEquals(6, results.get(0).intValue()); 757 // Add 2 files to store 1 (it has 2 files now). 758 for (int i = 0; i < 2; ++i) { 759 sm.notCompacting.add(createFile()); 760 } 761 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 762 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 763 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 764 currentBlock.unblock(); 765 currentBlock = blocker.waitForBlocking(); 766 assertEquals(3, results.size()); 767 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 768 assertEquals(2, results.get(2).intValue()); 769 770 currentBlock.unblock(); 771 cst.interruptIfNecessary(); 772 } 773 774 /** 775 * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then 776 * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The 777 * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time 778 * stamp but different sequence id, and will get scanned successively during compaction. 779 * <p/> 780 * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set 781 * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out 782 * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause 783 * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test 784 */ 785 @Test 786 public void testCompactionSeqId() throws Exception { 787 final byte[] ROW = Bytes.toBytes("row"); 788 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 789 790 long timestamp = 10000; 791 792 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 793 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 794 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 795 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 796 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 797 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 798 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 799 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 800 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 801 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 802 for (int i = 0; i < 10; i++) { 803 Put put = new Put(ROW); 804 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 805 r.put(put); 806 } 807 r.flush(true); 808 809 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 810 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 811 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 812 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 813 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 814 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 815 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 816 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 817 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 818 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 819 for (int i = 18; i > 8; i--) { 820 Put put = new Put(ROW); 821 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 822 r.put(put); 823 } 824 r.flush(true); 825 r.compact(true); 826 } 827 828 public static class DummyCompactor extends DefaultCompactor { 829 public DummyCompactor(Configuration conf, HStore store) { 830 super(conf, store); 831 this.keepSeqIdPeriod = 0; 832 } 833 } 834 835 private static HStoreFile createFile() throws Exception { 836 HStoreFile sf = mock(HStoreFile.class); 837 when(sf.getPath()).thenReturn(new Path("file")); 838 StoreFileReader r = mock(StoreFileReader.class); 839 when(r.length()).thenReturn(10L); 840 when(sf.getReader()).thenReturn(r); 841 return sf; 842 } 843 844 /** 845 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 846 * finishes. 847 */ 848 public static class Tracker implements CompactionLifeCycleTracker { 849 850 private final CountDownLatch done; 851 852 public Tracker(CountDownLatch done) { 853 this.done = done; 854 } 855 856 @Override 857 public void afterExecution(Store store) { 858 done.countDown(); 859 } 860 } 861 862 /** 863 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 864 * finishes. 865 */ 866 public static class WaitThroughPutController extends NoLimitThroughputController { 867 868 public WaitThroughPutController() { 869 } 870 871 @Override 872 public long control(String compactionName, long size) throws InterruptedException { 873 Thread.sleep(6000000); 874 return 6000000; 875 } 876 } 877}