001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES; 022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; 024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; 025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; 026import static org.junit.Assert.assertEquals; 027import static org.junit.Assert.assertFalse; 028import static org.junit.Assert.assertThrows; 029import static org.junit.Assert.assertTrue; 030import static org.junit.Assert.fail; 031import static org.mockito.ArgumentMatchers.any; 032import static org.mockito.Mockito.doAnswer; 033import static org.mockito.Mockito.mock; 034import static org.mockito.Mockito.spy; 035import static org.mockito.Mockito.when; 036 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.Collection; 040import java.util.Collections; 041import java.util.List; 042import java.util.Optional; 043import java.util.concurrent.CountDownLatch; 044import java.util.concurrent.TimeUnit; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FSDataOutputStream; 047import org.apache.hadoop.fs.FileStatus; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.ChoreService; 051import org.apache.hadoop.hbase.HBaseClassTestRule; 052import org.apache.hadoop.hbase.HBaseConfiguration; 053import org.apache.hadoop.hbase.HBaseTestingUtil; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.HTestConst; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.Waiter; 058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 060import org.apache.hadoop.hbase.client.Delete; 061import org.apache.hadoop.hbase.client.Durability; 062import org.apache.hadoop.hbase.client.Put; 063import org.apache.hadoop.hbase.client.Table; 064import org.apache.hadoop.hbase.client.TableDescriptor; 065import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 066import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; 067import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 068import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; 069import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; 070import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; 071import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 072import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 073import org.apache.hadoop.hbase.security.User; 074import org.apache.hadoop.hbase.testclassification.MediumTests; 075import org.apache.hadoop.hbase.testclassification.RegionServerTests; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 078import org.apache.hadoop.hbase.util.Threads; 079import org.apache.hadoop.hbase.wal.WAL; 080import org.junit.After; 081import org.junit.Assume; 082import org.junit.Before; 083import org.junit.ClassRule; 084import org.junit.Rule; 085import org.junit.Test; 086import org.junit.experimental.categories.Category; 087import org.junit.rules.TestName; 088import org.mockito.Mockito; 089import org.mockito.invocation.InvocationOnMock; 090import org.mockito.stubbing.Answer; 091 092/** 093 * Test compaction framework and common functions 094 */ 095@Category({ RegionServerTests.class, MediumTests.class }) 096public class TestCompaction { 097 098 @ClassRule 099 public static final HBaseClassTestRule CLASS_RULE = 100 HBaseClassTestRule.forClass(TestCompaction.class); 101 102 @Rule 103 public TestName name = new TestName(); 104 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 105 protected Configuration conf = UTIL.getConfiguration(); 106 107 private HRegion r = null; 108 private TableDescriptor tableDescriptor = null; 109 private static final byte[] COLUMN_FAMILY = fam1; 110 private final byte[] STARTROW = Bytes.toBytes(START_KEY); 111 private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY; 112 private int compactionThreshold; 113 private byte[] secondRowBytes, thirdRowBytes; 114 private static final long MAX_FILES_TO_COMPACT = 10; 115 private final byte[] FAMILY = Bytes.toBytes("cf"); 116 117 /** constructor */ 118 public TestCompaction() { 119 super(); 120 121 // Set cache flush size to 1MB 122 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); 123 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); 124 conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L); 125 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 126 NoLimitThroughputController.class.getName()); 127 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); 128 129 secondRowBytes = START_KEY_BYTES.clone(); 130 // Increment the least significant character so we get to next row. 131 secondRowBytes[START_KEY_BYTES.length - 1]++; 132 thirdRowBytes = START_KEY_BYTES.clone(); 133 thirdRowBytes[START_KEY_BYTES.length - 1] = 134 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 135 } 136 137 @Before 138 public void setUp() throws Exception { 139 TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name.getMethodName()); 140 if (name.getMethodName().equals("testCompactionSeqId")) { 141 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); 142 UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, 143 DummyCompactor.class.getName()); 144 ColumnFamilyDescriptor familyDescriptor = 145 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build(); 146 builder.setColumnFamily(familyDescriptor); 147 } 148 this.tableDescriptor = builder.build(); 149 this.r = UTIL.createLocalHRegion(tableDescriptor, null, null); 150 } 151 152 @After 153 public void tearDown() throws Exception { 154 WAL wal = r.getWAL(); 155 this.r.close(); 156 wal.close(); 157 } 158 159 /** 160 * Verify that you can stop a long-running compaction (used during RS shutdown) 161 */ 162 @Test 163 public void testInterruptCompactionBySize() throws Exception { 164 assertEquals(0, count()); 165 166 // lower the polling interval for this test 167 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); 168 169 try { 170 // Create a couple store files w/ 15KB (over 10KB interval) 171 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 172 byte[] pad = new byte[1000]; // 1 KB chunk 173 for (int i = 0; i < compactionThreshold; i++) { 174 Table loader = new RegionAsTable(r); 175 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 176 p.setDurability(Durability.SKIP_WAL); 177 for (int j = 0; j < jmax; j++) { 178 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 179 } 180 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 181 loader.put(p); 182 r.flush(true); 183 } 184 185 HRegion spyR = spy(r); 186 doAnswer(new Answer<Object>() { 187 @Override 188 public Object answer(InvocationOnMock invocation) throws Throwable { 189 r.writestate.writesEnabled = false; 190 return invocation.callRealMethod(); 191 } 192 }).when(spyR).doRegionCompactionPrep(); 193 194 // force a minor compaction, but not before requesting a stop 195 spyR.compactStores(); 196 197 // ensure that the compaction stopped, all old files are intact, 198 HStore s = r.getStore(COLUMN_FAMILY); 199 assertEquals(compactionThreshold, s.getStorefilesCount()); 200 assertTrue(s.getStorefilesSize() > 15 * 1000); 201 // and no new store files persisted past compactStores() 202 // only one empty dir exists in temp dir 203 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 204 assertEquals(1, ls.length); 205 Path storeTempDir = 206 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 207 assertTrue(r.getFilesystem().exists(storeTempDir)); 208 ls = r.getFilesystem().listStatus(storeTempDir); 209 assertEquals(0, ls.length); 210 } finally { 211 // don't mess up future tests 212 r.writestate.writesEnabled = true; 213 conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); 214 215 // Delete all Store information once done using 216 for (int i = 0; i < compactionThreshold; i++) { 217 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 218 byte[][] famAndQf = { COLUMN_FAMILY, null }; 219 delete.addFamily(famAndQf[0]); 220 r.delete(delete); 221 } 222 r.flush(true); 223 224 // Multiple versions allowed for an entry, so the delete isn't enough 225 // Lower TTL and expire to ensure that all our entries have been wiped 226 final int ttl = 1000; 227 for (HStore store : this.r.stores.values()) { 228 ScanInfo old = store.getScanInfo(); 229 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 230 store.setScanInfo(si); 231 } 232 Thread.sleep(ttl); 233 234 r.compact(true); 235 assertEquals(0, count()); 236 } 237 } 238 239 @Test 240 public void testInterruptCompactionByTime() throws Exception { 241 assertEquals(0, count()); 242 243 // lower the polling interval for this test 244 conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); 245 246 try { 247 // Create a couple store files w/ 15KB (over 10KB interval) 248 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 249 byte[] pad = new byte[1000]; // 1 KB chunk 250 for (int i = 0; i < compactionThreshold; i++) { 251 Table loader = new RegionAsTable(r); 252 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 253 p.setDurability(Durability.SKIP_WAL); 254 for (int j = 0; j < jmax; j++) { 255 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 256 } 257 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 258 loader.put(p); 259 r.flush(true); 260 } 261 262 HRegion spyR = spy(r); 263 doAnswer(new Answer<Object>() { 264 @Override 265 public Object answer(InvocationOnMock invocation) throws Throwable { 266 r.writestate.writesEnabled = false; 267 return invocation.callRealMethod(); 268 } 269 }).when(spyR).doRegionCompactionPrep(); 270 271 // force a minor compaction, but not before requesting a stop 272 spyR.compactStores(); 273 274 // ensure that the compaction stopped, all old files are intact, 275 HStore s = r.getStore(COLUMN_FAMILY); 276 assertEquals(compactionThreshold, s.getStorefilesCount()); 277 assertTrue(s.getStorefilesSize() > 15 * 1000); 278 // and no new store files persisted past compactStores() 279 // only one empty dir exists in temp dir 280 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); 281 assertEquals(1, ls.length); 282 Path storeTempDir = 283 new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); 284 assertTrue(r.getFilesystem().exists(storeTempDir)); 285 ls = r.getFilesystem().listStatus(storeTempDir); 286 assertEquals(0, ls.length); 287 } finally { 288 // don't mess up future tests 289 r.writestate.writesEnabled = true; 290 conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); 291 292 // Delete all Store information once done using 293 for (int i = 0; i < compactionThreshold; i++) { 294 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); 295 byte[][] famAndQf = { COLUMN_FAMILY, null }; 296 delete.addFamily(famAndQf[0]); 297 r.delete(delete); 298 } 299 r.flush(true); 300 301 // Multiple versions allowed for an entry, so the delete isn't enough 302 // Lower TTL and expire to ensure that all our entries have been wiped 303 final int ttl = 1000; 304 for (HStore store : this.r.stores.values()) { 305 ScanInfo old = store.getScanInfo(); 306 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); 307 store.setScanInfo(si); 308 } 309 Thread.sleep(ttl); 310 311 r.compact(true); 312 assertEquals(0, count()); 313 } 314 } 315 316 private int count() throws IOException { 317 int count = 0; 318 for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) { 319 f.initReader(); 320 try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) { 321 scanner.seek(KeyValue.LOWESTKEY); 322 while (scanner.next() != null) { 323 count++; 324 } 325 } 326 } 327 return count; 328 } 329 330 private void createStoreFile(final HRegion region) throws IOException { 331 createStoreFile(region, Bytes.toString(COLUMN_FAMILY)); 332 } 333 334 private void createStoreFile(final HRegion region, String family) throws IOException { 335 Table loader = new RegionAsTable(region); 336 HTestConst.addContent(loader, family); 337 region.flush(true); 338 } 339 340 @Test 341 public void testCompactionWithCorruptResult() throws Exception { 342 int nfiles = 10; 343 for (int i = 0; i < nfiles; i++) { 344 createStoreFile(r); 345 } 346 HStore store = r.getStore(COLUMN_FAMILY); 347 348 Collection<HStoreFile> storeFiles = store.getStorefiles(); 349 DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor(); 350 CompactionRequestImpl request = new CompactionRequestImpl(storeFiles); 351 tool.compact(request, NoLimitThroughputController.INSTANCE, null); 352 353 // Now lets corrupt the compacted file. 354 FileSystem fs = store.getFileSystem(); 355 // default compaction policy created one and only one new compacted file 356 Path tmpPath = store.getRegionFileSystem().createTempName(); 357 try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) { 358 stream.writeChars("CORRUPT FILE!!!!"); 359 } 360 // The complete compaction should fail and the corrupt file should remain 361 // in the 'tmp' directory; 362 assertThrows(IOException.class, () -> store.doCompaction(null, null, null, 363 EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath))); 364 assertTrue(fs.exists(tmpPath)); 365 } 366 367 /** 368 * Create a custom compaction request and be sure that we can track it through the queue, knowing 369 * when the compaction is completed. 370 */ 371 @Test 372 public void testTrackingCompactionRequest() throws Exception { 373 // setup a compact/split thread on a mock server 374 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 375 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 376 CompactSplit thread = new CompactSplit(mockServer); 377 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 378 379 // setup a region/store with some files 380 HStore store = r.getStore(COLUMN_FAMILY); 381 createStoreFile(r); 382 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { 383 createStoreFile(r); 384 } 385 386 CountDownLatch latch = new CountDownLatch(1); 387 Tracker tracker = new Tracker(latch); 388 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null); 389 // wait for the latch to complete. 390 latch.await(); 391 392 thread.interruptIfNecessary(); 393 } 394 395 @Test 396 public void testCompactionFailure() throws Exception { 397 // setup a compact/split thread on a mock server 398 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 399 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 400 CompactSplit thread = new CompactSplit(mockServer); 401 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 402 403 // setup a region/store with some files 404 HStore store = r.getStore(COLUMN_FAMILY); 405 createStoreFile(r); 406 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 407 createStoreFile(r); 408 } 409 410 HRegion mockRegion = Mockito.spy(r); 411 Mockito.when(mockRegion.checkSplit()) 412 .thenThrow(new RuntimeException("Thrown intentionally by test!")); 413 414 try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) { 415 416 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 417 long preFailedCount = metricsWrapper.getNumCompactionsFailed(); 418 419 CountDownLatch latch = new CountDownLatch(1); 420 Tracker tracker = new Tracker(latch); 421 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker, 422 null); 423 // wait for the latch to complete. 424 latch.await(120, TimeUnit.SECONDS); 425 426 // compaction should have completed and been marked as failed due to error in split request 427 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted(); 428 long postFailedCount = metricsWrapper.getNumCompactionsFailed(); 429 430 assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" 431 + postCompletedCount + ")", postCompletedCount > preCompletedCount); 432 assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" 433 + postFailedCount + ")", postFailedCount > preFailedCount); 434 } 435 } 436 437 /** 438 * Test no new Compaction requests are generated after calling stop compactions 439 */ 440 @Test 441 public void testStopStartCompaction() throws IOException { 442 // setup a compact/split thread on a mock server 443 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 444 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 445 final CompactSplit thread = new CompactSplit(mockServer); 446 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 447 // setup a region/store with some files 448 HStore store = r.getStore(COLUMN_FAMILY); 449 createStoreFile(r); 450 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { 451 createStoreFile(r); 452 } 453 thread.switchCompaction(false); 454 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 455 CompactionLifeCycleTracker.DUMMY, null); 456 assertFalse(thread.isCompactionsEnabled()); 457 int longCompactions = thread.getLongCompactions().getActiveCount(); 458 int shortCompactions = thread.getShortCompactions().getActiveCount(); 459 assertEquals( 460 "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0, 461 longCompactions + shortCompactions); 462 thread.switchCompaction(true); 463 assertTrue(thread.isCompactionsEnabled()); 464 // Make sure no compactions have run. 465 assertEquals(0, thread.getLongCompactions().getCompletedTaskCount() 466 + thread.getShortCompactions().getCompletedTaskCount()); 467 // Request a compaction and make sure it is submitted successfully. 468 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, 469 CompactionLifeCycleTracker.DUMMY, null); 470 // Wait until the compaction finishes. 471 Waiter.waitFor(UTIL.getConfiguration(), 5000, 472 (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount() 473 + thread.getShortCompactions().getCompletedTaskCount() == 1); 474 // Make sure there are no compactions running. 475 assertEquals(0, 476 thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount()); 477 } 478 479 @Test 480 public void testInterruptingRunningCompactions() throws Exception { 481 // setup a compact/split thread on a mock server 482 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 483 WaitThroughPutController.class.getName()); 484 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 485 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 486 CompactSplit thread = new CompactSplit(mockServer); 487 488 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 489 490 // setup a region/store with some files 491 HStore store = r.getStore(COLUMN_FAMILY); 492 int jmax = (int) Math.ceil(15.0 / compactionThreshold); 493 byte[] pad = new byte[1000]; // 1 KB chunk 494 for (int i = 0; i < compactionThreshold; i++) { 495 Table loader = new RegionAsTable(r); 496 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); 497 p.setDurability(Durability.SKIP_WAL); 498 for (int j = 0; j < jmax; j++) { 499 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); 500 } 501 HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY)); 502 loader.put(p); 503 r.flush(true); 504 } 505 HStore s = r.getStore(COLUMN_FAMILY); 506 int initialFiles = s.getStorefilesCount(); 507 508 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, 509 CompactionLifeCycleTracker.DUMMY, null); 510 511 Thread.sleep(3000); 512 thread.switchCompaction(false); 513 assertEquals(initialFiles, s.getStorefilesCount()); 514 // don't mess up future tests 515 thread.switchCompaction(true); 516 } 517 518 /** 519 * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit} 520 * @throws Exception on failure 521 */ 522 @Test 523 public void testMultipleCustomCompactionRequests() throws Exception { 524 // setup a compact/split thread on a mock server 525 HRegionServer mockServer = Mockito.mock(HRegionServer.class); 526 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); 527 CompactSplit thread = new CompactSplit(mockServer); 528 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); 529 530 // setup a region/store with some files 531 int numStores = r.getStores().size(); 532 CountDownLatch latch = new CountDownLatch(numStores); 533 Tracker tracker = new Tracker(latch); 534 // create some store files and setup requests for each store on which we want to do a 535 // compaction 536 for (HStore store : r.getStores()) { 537 createStoreFile(r, store.getColumnFamilyName()); 538 createStoreFile(r, store.getColumnFamilyName()); 539 createStoreFile(r, store.getColumnFamilyName()); 540 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker, 541 null); 542 } 543 // wait for the latch to complete. 544 latch.await(); 545 546 thread.interruptIfNecessary(); 547 } 548 549 class StoreMockMaker extends StatefulStoreMockMaker { 550 public ArrayList<HStoreFile> compacting = new ArrayList<>(); 551 public ArrayList<HStoreFile> notCompacting = new ArrayList<>(); 552 private final ArrayList<Integer> results; 553 554 public StoreMockMaker(ArrayList<Integer> results) { 555 this.results = results; 556 } 557 558 public class TestCompactionContext extends CompactionContext { 559 560 private List<HStoreFile> selectedFiles; 561 562 public TestCompactionContext(List<HStoreFile> selectedFiles) { 563 super(); 564 this.selectedFiles = selectedFiles; 565 } 566 567 @Override 568 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 569 return new ArrayList<>(); 570 } 571 572 @Override 573 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, 574 boolean mayUseOffPeak, boolean forceMajor) throws IOException { 575 this.request = new CompactionRequestImpl(selectedFiles); 576 this.request.setPriority(getPriority()); 577 return true; 578 } 579 580 @Override 581 public List<Path> compact(ThroughputController throughputController, User user) 582 throws IOException { 583 finishCompaction(this.selectedFiles); 584 return new ArrayList<>(); 585 } 586 } 587 588 @Override 589 public synchronized Optional<CompactionContext> selectCompaction() { 590 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); 591 compacting.addAll(notCompacting); 592 notCompacting.clear(); 593 try { 594 ctx.select(null, false, false, false); 595 } catch (IOException ex) { 596 fail("Shouldn't happen"); 597 } 598 return Optional.of(ctx); 599 } 600 601 @Override 602 public synchronized void cancelCompaction(Object object) { 603 TestCompactionContext ctx = (TestCompactionContext) object; 604 compacting.removeAll(ctx.selectedFiles); 605 notCompacting.addAll(ctx.selectedFiles); 606 } 607 608 public synchronized void finishCompaction(List<HStoreFile> sfs) { 609 if (sfs.isEmpty()) return; 610 synchronized (results) { 611 results.add(sfs.size()); 612 } 613 compacting.removeAll(sfs); 614 } 615 616 @Override 617 public int getPriority() { 618 return 7 - compacting.size() - notCompacting.size(); 619 } 620 } 621 622 public class BlockingStoreMockMaker extends StatefulStoreMockMaker { 623 BlockingCompactionContext blocked = null; 624 625 public class BlockingCompactionContext extends CompactionContext { 626 public volatile boolean isInCompact = false; 627 628 public void unblock() { 629 synchronized (this) { 630 this.notifyAll(); 631 } 632 } 633 634 @Override 635 public List<Path> compact(ThroughputController throughputController, User user) 636 throws IOException { 637 try { 638 isInCompact = true; 639 synchronized (this) { 640 this.wait(); 641 } 642 } catch (InterruptedException e) { 643 Assume.assumeNoException(e); 644 } 645 return new ArrayList<>(); 646 } 647 648 @Override 649 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { 650 return new ArrayList<>(); 651 } 652 653 @Override 654 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) 655 throws IOException { 656 this.request = new CompactionRequestImpl(new ArrayList<>()); 657 return true; 658 } 659 } 660 661 @Override 662 public Optional<CompactionContext> selectCompaction() { 663 this.blocked = new BlockingCompactionContext(); 664 try { 665 this.blocked.select(null, false, false, false); 666 } catch (IOException ex) { 667 fail("Shouldn't happen"); 668 } 669 return Optional.of(blocked); 670 } 671 672 @Override 673 public void cancelCompaction(Object object) { 674 } 675 676 @Override 677 public int getPriority() { 678 return Integer.MIN_VALUE; // some invalid value, see createStoreMock 679 } 680 681 public BlockingCompactionContext waitForBlocking() { 682 while (this.blocked == null || !this.blocked.isInCompact) { 683 Threads.sleepWithoutInterrupt(50); 684 } 685 BlockingCompactionContext ctx = this.blocked; 686 this.blocked = null; 687 return ctx; 688 } 689 690 @Override 691 public HStore createStoreMock(String name) throws Exception { 692 return createStoreMock(Integer.MIN_VALUE, name); 693 } 694 695 public HStore createStoreMock(int priority, String name) throws Exception { 696 // Override the mock to always return the specified priority. 697 HStore s = super.createStoreMock(name); 698 when(s.getCompactPriority()).thenReturn(priority); 699 return s; 700 } 701 } 702 703 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */ 704 @Test 705 public void testCompactionQueuePriorities() throws Exception { 706 // Setup a compact/split thread on a mock server. 707 final Configuration conf = HBaseConfiguration.create(); 708 HRegionServer mockServer = mock(HRegionServer.class); 709 when(mockServer.isStopped()).thenReturn(false); 710 when(mockServer.getConfiguration()).thenReturn(conf); 711 when(mockServer.getChoreService()).thenReturn(new ChoreService("test")); 712 CompactSplit cst = new CompactSplit(mockServer); 713 when(mockServer.getCompactSplitThread()).thenReturn(cst); 714 // prevent large compaction thread pool stealing job from small compaction queue. 715 cst.shutdownLongCompactions(); 716 // Set up the region mock that redirects compactions. 717 HRegion r = mock(HRegion.class); 718 when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() { 719 @Override 720 public Boolean answer(InvocationOnMock invocation) throws Throwable { 721 invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null); 722 return true; 723 } 724 }); 725 726 // Set up store mocks for 2 "real" stores and the one we use for blocking CST. 727 ArrayList<Integer> results = new ArrayList<>(); 728 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); 729 HStore store = sm.createStoreMock("store1"); 730 HStore store2 = sm2.createStoreMock("store2"); 731 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); 732 733 // First, block the compaction thread so that we could muck with queue. 734 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1"); 735 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking(); 736 737 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively. 738 for (int i = 0; i < 4; ++i) { 739 sm.notCompacting.add(createFile()); 740 } 741 cst.requestSystemCompaction(r, store, "s1-pri3"); 742 for (int i = 0; i < 3; ++i) { 743 sm2.notCompacting.add(createFile()); 744 } 745 cst.requestSystemCompaction(r, store2, "s2-pri4"); 746 // Now add 2 more files to store1 and queue compaction - pri 1. 747 for (int i = 0; i < 2; ++i) { 748 sm.notCompacting.add(createFile()); 749 } 750 cst.requestSystemCompaction(r, store, "s1-pri1"); 751 // Finally add blocking compaction with priority 2. 752 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2"); 753 754 // Unblock the blocking compaction; we should run pri1 and become block again in pri2. 755 currentBlock.unblock(); 756 currentBlock = blocker.waitForBlocking(); 757 // Pri1 should have "compacted" all 6 files. 758 assertEquals(1, results.size()); 759 assertEquals(6, results.get(0).intValue()); 760 // Add 2 files to store 1 (it has 2 files now). 761 for (int i = 0; i < 2; ++i) { 762 sm.notCompacting.add(createFile()); 763 } 764 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority 765 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end. 766 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7"); 767 currentBlock.unblock(); 768 currentBlock = blocker.waitForBlocking(); 769 assertEquals(3, results.size()); 770 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files. 771 assertEquals(2, results.get(2).intValue()); 772 773 currentBlock.unblock(); 774 cst.interruptIfNecessary(); 775 } 776 777 /** 778 * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then 779 * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The 780 * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time 781 * stamp but different sequence id, and will get scanned successively during compaction. 782 * <p/> 783 * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set 784 * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out 785 * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause 786 * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test 787 */ 788 @Test 789 public void testCompactionSeqId() throws Exception { 790 final byte[] ROW = Bytes.toBytes("row"); 791 final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 792 793 long timestamp = 10000; 794 795 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 796 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 797 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 798 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 799 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 800 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 801 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 802 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 803 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 804 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 805 for (int i = 0; i < 10; i++) { 806 Put put = new Put(ROW); 807 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 808 r.put(put); 809 } 810 r.flush(true); 811 812 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 813 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 814 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 815 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 816 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 817 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 818 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 819 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 820 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 821 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 822 for (int i = 18; i > 8; i--) { 823 Put put = new Put(ROW); 824 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); 825 r.put(put); 826 } 827 r.flush(true); 828 r.compact(true); 829 } 830 831 public static class DummyCompactor extends DefaultCompactor { 832 public DummyCompactor(Configuration conf, HStore store) { 833 super(conf, store); 834 this.keepSeqIdPeriod = 0; 835 } 836 } 837 838 private static HStoreFile createFile() throws Exception { 839 HStoreFile sf = mock(HStoreFile.class); 840 when(sf.getPath()).thenReturn(new Path("file")); 841 StoreFileReader r = mock(StoreFileReader.class); 842 when(r.length()).thenReturn(10L); 843 when(sf.getReader()).thenReturn(r); 844 return sf; 845 } 846 847 /** 848 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 849 * finishes. 850 */ 851 public static class Tracker implements CompactionLifeCycleTracker { 852 853 private final CountDownLatch done; 854 855 public Tracker(CountDownLatch done) { 856 this.done = done; 857 } 858 859 @Override 860 public void afterExecution(Store store) { 861 done.countDown(); 862 } 863 } 864 865 /** 866 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction 867 * finishes. 868 */ 869 public static class WaitThroughPutController extends NoLimitThroughputController { 870 871 public WaitThroughPutController() { 872 } 873 874 @Override 875 public long control(String compactionName, long size) throws InterruptedException { 876 Thread.sleep(6000000); 877 return 6000000; 878 } 879 } 880}