001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.hamcrest.Matchers.instanceOf; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertThrows; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.util.Map; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.atomic.AtomicBoolean; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.FilterFileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.ChoreService; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.Stoppable; 045import org.apache.hadoop.hbase.testclassification.MasterTests; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CommonFSUtils; 049import org.apache.hadoop.hbase.util.StoppableImplementation; 050import org.apache.hadoop.hbase.util.Threads; 051import org.junit.AfterClass; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.mockito.Mockito; 057import org.mockito.invocation.InvocationOnMock; 058import org.mockito.stubbing.Answer; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 063 064@Category({ MasterTests.class, SmallTests.class }) 065public class TestCleanerChore { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestCleanerChore.class); 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); 072 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 073 private static DirScanPool POOL; 074 private static ChoreService SERVICE; 075 076 @BeforeClass 077 public static void setup() { 078 POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); 079 SERVICE = new ChoreService("cleaner", 2, true); 080 } 081 082 @AfterClass 083 public static void cleanup() throws Exception { 084 SERVICE.shutdown(); 085 UTIL.cleanupTestDir(); 086 POOL.shutdownNow(); 087 } 088 089 @Test 090 public void testSavesFilesOnRequest() throws Exception { 091 Stoppable stop = new StoppableImplementation(); 092 Configuration conf = UTIL.getConfiguration(); 093 Path testDir = UTIL.getDataTestDir(); 094 FileSystem fs = UTIL.getTestFileSystem(); 095 String confKey = "hbase.test.cleaner.delegates"; 096 conf.set(confKey, NeverDelete.class.getName()); 097 098 AllValidPaths chore = 099 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 100 101 // create the directory layout in the directory to clean 102 Path parent = new Path(testDir, "parent"); 103 Path file = new Path(parent, "someFile"); 104 fs.mkdirs(parent); 105 // touch a new file 106 fs.create(file).close(); 107 assertTrue("Test file didn't get created.", fs.exists(file)); 108 109 // run the chore 110 chore.chore(); 111 112 // verify all the files were preserved 113 assertTrue("File shouldn't have been deleted", fs.exists(file)); 114 assertTrue("directory shouldn't have been deleted", fs.exists(parent)); 115 } 116 117 @Test 118 public void retriesIOExceptionInStatus() throws Exception { 119 Stoppable stop = new StoppableImplementation(); 120 Configuration conf = UTIL.getConfiguration(); 121 Path testDir = UTIL.getDataTestDir(); 122 FileSystem fs = UTIL.getTestFileSystem(); 123 String confKey = "hbase.test.cleaner.delegates"; 124 125 Path child = new Path(testDir, "child"); 126 Path file = new Path(child, "file"); 127 fs.mkdirs(child); 128 fs.create(file).close(); 129 assertTrue("test file didn't get created.", fs.exists(file)); 130 final AtomicBoolean fails = new AtomicBoolean(true); 131 FilterFileSystem filtered = new FilterFileSystem(fs) { 132 public FileStatus[] listStatus(Path f) throws IOException { 133 if (fails.get()) { 134 throw new IOException("whomp whomp."); 135 } 136 return fs.listStatus(f); 137 } 138 }; 139 140 AllValidPaths chore = 141 new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey, POOL); 142 SERVICE.scheduleChore(chore); 143 try { 144 // trouble talking to the filesystem 145 // and verify that it accurately reported the failure. 146 CompletableFuture<Boolean> errorFuture = chore.triggerCleanerNow(); 147 ExecutionException e = assertThrows(ExecutionException.class, () -> errorFuture.get()); 148 assertThat(e.getCause(), instanceOf(IOException.class)); 149 assertThat(e.getCause().getMessage(), containsString("whomp")); 150 151 // verify that it couldn't clean the files. 152 assertTrue("test rig failed to inject failure.", fs.exists(file)); 153 assertTrue("test rig failed to inject failure.", fs.exists(child)); 154 155 // filesystem is back 156 fails.set(false); 157 for (;;) { 158 CompletableFuture<Boolean> succFuture = chore.triggerCleanerNow(); 159 // the reset of the future is async, so it is possible that we get the previous future 160 // again. 161 if (succFuture != errorFuture) { 162 // verify that it accurately reported success. 163 assertTrue("chore should claim it succeeded.", succFuture.get()); 164 break; 165 } 166 } 167 // verify everything is gone. 168 assertFalse("file should have been destroyed.", fs.exists(file)); 169 assertFalse("directory should have been destroyed.", fs.exists(child)); 170 171 } finally { 172 chore.cancel(); 173 } 174 } 175 176 @Test 177 public void testDeletesEmptyDirectories() throws Exception { 178 Stoppable stop = new StoppableImplementation(); 179 Configuration conf = UTIL.getConfiguration(); 180 Path testDir = UTIL.getDataTestDir(); 181 FileSystem fs = UTIL.getTestFileSystem(); 182 String confKey = "hbase.test.cleaner.delegates"; 183 conf.set(confKey, AlwaysDelete.class.getName()); 184 185 AllValidPaths chore = 186 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 187 188 // create the directory layout in the directory to clean 189 Path parent = new Path(testDir, "parent"); 190 Path child = new Path(parent, "child"); 191 Path emptyChild = new Path(parent, "emptyChild"); 192 Path file = new Path(child, "someFile"); 193 fs.mkdirs(child); 194 fs.mkdirs(emptyChild); 195 // touch a new file 196 fs.create(file).close(); 197 // also create a file in the top level directory 198 Path topFile = new Path(testDir, "topFile"); 199 fs.create(topFile).close(); 200 assertTrue("Test file didn't get created.", fs.exists(file)); 201 assertTrue("Test file didn't get created.", fs.exists(topFile)); 202 203 // run the chore 204 chore.chore(); 205 206 // verify all the files got deleted 207 assertFalse("File didn't get deleted", fs.exists(topFile)); 208 assertFalse("File didn't get deleted", fs.exists(file)); 209 assertFalse("Empty directory didn't get deleted", fs.exists(child)); 210 assertFalse("Empty directory didn't get deleted", fs.exists(parent)); 211 } 212 213 /** 214 * Test to make sure that we don't attempt to ask the delegate whether or not we should preserve a 215 * directory. 216 * @throws Exception on failure 217 */ 218 @Test 219 public void testDoesNotCheckDirectories() throws Exception { 220 Stoppable stop = new StoppableImplementation(); 221 Configuration conf = UTIL.getConfiguration(); 222 Path testDir = UTIL.getDataTestDir(); 223 FileSystem fs = UTIL.getTestFileSystem(); 224 String confKey = "hbase.test.cleaner.delegates"; 225 conf.set(confKey, AlwaysDelete.class.getName()); 226 227 AllValidPaths chore = 228 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 229 // spy on the delegate to ensure that we don't check for directories 230 AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); 231 AlwaysDelete spy = Mockito.spy(delegate); 232 chore.cleanersChain.set(0, spy); 233 234 // create the directory layout in the directory to clean 235 Path parent = new Path(testDir, "parent"); 236 Path file = new Path(parent, "someFile"); 237 fs.mkdirs(parent); 238 assertTrue("Test parent didn't get created.", fs.exists(parent)); 239 // touch a new file 240 fs.create(file).close(); 241 assertTrue("Test file didn't get created.", fs.exists(file)); 242 243 FileStatus fStat = fs.getFileStatus(parent); 244 chore.chore(); 245 // make sure we never checked the directory 246 Mockito.verify(spy, Mockito.never()).isFileDeletable(fStat); 247 Mockito.reset(spy); 248 } 249 250 @Test 251 public void testStoppedCleanerDoesNotDeleteFiles() throws Exception { 252 Stoppable stop = new StoppableImplementation(); 253 Configuration conf = UTIL.getConfiguration(); 254 Path testDir = UTIL.getDataTestDir(); 255 FileSystem fs = UTIL.getTestFileSystem(); 256 String confKey = "hbase.test.cleaner.delegates"; 257 conf.set(confKey, AlwaysDelete.class.getName()); 258 259 AllValidPaths chore = 260 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 261 262 // also create a file in the top level directory 263 Path topFile = new Path(testDir, "topFile"); 264 fs.create(topFile).close(); 265 assertTrue("Test file didn't get created.", fs.exists(topFile)); 266 267 // stop the chore 268 stop.stop("testing stop"); 269 270 // run the chore 271 chore.chore(); 272 273 // test that the file still exists 274 assertTrue("File got deleted while chore was stopped", fs.exists(topFile)); 275 } 276 277 /** 278 * While cleaning a directory, all the files in the directory may be deleted, but there may be 279 * another file added, in which case the directory shouldn't be deleted. 280 * @throws IOException on failure 281 */ 282 @Test 283 public void testCleanerDoesNotDeleteDirectoryWithLateAddedFiles() throws IOException { 284 Stoppable stop = new StoppableImplementation(); 285 Configuration conf = UTIL.getConfiguration(); 286 final Path testDir = UTIL.getDataTestDir(); 287 final FileSystem fs = UTIL.getTestFileSystem(); 288 String confKey = "hbase.test.cleaner.delegates"; 289 conf.set(confKey, AlwaysDelete.class.getName()); 290 291 AllValidPaths chore = 292 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 293 // spy on the delegate to ensure that we don't check for directories 294 AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); 295 AlwaysDelete spy = Mockito.spy(delegate); 296 chore.cleanersChain.set(0, spy); 297 298 // create the directory layout in the directory to clean 299 final Path parent = new Path(testDir, "parent"); 300 Path file = new Path(parent, "someFile"); 301 fs.mkdirs(parent); 302 // touch a new file 303 fs.create(file).close(); 304 assertTrue("Test file didn't get created.", fs.exists(file)); 305 final Path addedFile = new Path(parent, "addedFile"); 306 307 // when we attempt to delete the original file, add another file in the same directory 308 Mockito.doAnswer(new Answer<Boolean>() { 309 @Override 310 public Boolean answer(InvocationOnMock invocation) throws Throwable { 311 fs.create(addedFile).close(); 312 CommonFSUtils.logFileSystemState(fs, testDir, LOG); 313 return (Boolean) invocation.callRealMethod(); 314 } 315 }).when(spy).isFileDeletable(Mockito.any()); 316 317 // run the chore 318 chore.chore(); 319 320 // make sure all the directories + added file exist, but the original file is deleted 321 assertTrue("Added file unexpectedly deleted", fs.exists(addedFile)); 322 assertTrue("Parent directory deleted unexpectedly", fs.exists(parent)); 323 assertFalse("Original file unexpectedly retained", fs.exists(file)); 324 Mockito.verify(spy, Mockito.times(1)).isFileDeletable(Mockito.any()); 325 Mockito.reset(spy); 326 } 327 328 /** 329 * The cleaner runs in a loop, where it first checks to see all the files under a directory can be 330 * deleted. If they all can, then we try to delete the directory. However, a file may be added 331 * that directory to after the original check. This ensures that we don't accidentally delete that 332 * directory on and don't get spurious IOExceptions. 333 * <p> 334 * This was from HBASE-7465. 335 * @throws Exception on failure 336 */ 337 @Test 338 public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception { 339 UTIL.cleanupTestDir(); 340 Stoppable stop = new StoppableImplementation(); 341 // need to use a localutil to not break the rest of the test that runs on the local FS, which 342 // gets hosed when we start to use a minicluster. 343 HBaseTestingUtil localUtil = new HBaseTestingUtil(); 344 Configuration conf = localUtil.getConfiguration(); 345 final Path testDir = UTIL.getDataTestDir(); 346 final FileSystem fs = UTIL.getTestFileSystem(); 347 LOG.debug("Writing test data to: " + testDir); 348 String confKey = "hbase.test.cleaner.delegates"; 349 conf.set(confKey, AlwaysDelete.class.getName()); 350 351 AllValidPaths chore = 352 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 353 // spy on the delegate to ensure that we don't check for directories 354 AlwaysDelete delegate = (AlwaysDelete) chore.cleanersChain.get(0); 355 AlwaysDelete spy = Mockito.spy(delegate); 356 chore.cleanersChain.set(0, spy); 357 358 // create the directory layout in the directory to clean 359 final Path parent = new Path(testDir, "parent"); 360 Path file = new Path(parent, "someFile"); 361 fs.mkdirs(parent); 362 // touch a new file 363 fs.create(file).close(); 364 assertTrue("Test file didn't get created.", fs.exists(file)); 365 final Path racyFile = new Path(parent, "addedFile"); 366 367 // when we attempt to delete the original file, add another file in the same directory 368 Mockito.doAnswer(new Answer<Boolean>() { 369 @Override 370 public Boolean answer(InvocationOnMock invocation) throws Throwable { 371 fs.create(racyFile).close(); 372 CommonFSUtils.logFileSystemState(fs, testDir, LOG); 373 return (Boolean) invocation.callRealMethod(); 374 } 375 }).when(spy).isFileDeletable(Mockito.any()); 376 377 // run the chore 378 chore.chore(); 379 380 // make sure all the directories + added file exist, but the original file is deleted 381 assertTrue("Added file unexpectedly deleted", fs.exists(racyFile)); 382 assertTrue("Parent directory deleted unexpectedly", fs.exists(parent)); 383 assertFalse("Original file unexpectedly retained", fs.exists(file)); 384 Mockito.verify(spy, Mockito.times(1)).isFileDeletable(Mockito.any()); 385 } 386 387 @Test 388 public void testDeleteFileWithCleanerEnabled() throws Exception { 389 Stoppable stop = new StoppableImplementation(); 390 Configuration conf = UTIL.getConfiguration(); 391 Path testDir = UTIL.getDataTestDir(); 392 FileSystem fs = UTIL.getTestFileSystem(); 393 String confKey = "hbase.test.cleaner.delegates"; 394 conf.set(confKey, AlwaysDelete.class.getName()); 395 396 AllValidPaths chore = 397 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 398 399 // Enable cleaner 400 chore.setEnabled(true); 401 402 // create the directory layout in the directory to clean 403 Path parent = new Path(testDir, "parent"); 404 Path child = new Path(parent, "child"); 405 Path file = new Path(child, "someFile"); 406 fs.mkdirs(child); 407 408 // touch a new file 409 fs.create(file).close(); 410 assertTrue("Test file didn't get created.", fs.exists(file)); 411 412 // run the chore 413 chore.chore(); 414 415 // verify all the files got deleted 416 assertFalse("File didn't get deleted", fs.exists(file)); 417 assertFalse("Empty directory didn't get deleted", fs.exists(child)); 418 assertFalse("Empty directory didn't get deleted", fs.exists(parent)); 419 } 420 421 @Test 422 public void testDeleteFileWithCleanerDisabled() throws Exception { 423 Stoppable stop = new StoppableImplementation(); 424 Configuration conf = UTIL.getConfiguration(); 425 Path testDir = UTIL.getDataTestDir(); 426 FileSystem fs = UTIL.getTestFileSystem(); 427 String confKey = "hbase.test.cleaner.delegates"; 428 conf.set(confKey, AlwaysDelete.class.getName()); 429 430 AllValidPaths chore = 431 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 432 433 // Disable cleaner 434 chore.setEnabled(false); 435 436 // create the directory layout in the directory to clean 437 Path parent = new Path(testDir, "parent"); 438 Path child = new Path(parent, "child"); 439 Path file = new Path(child, "someFile"); 440 fs.mkdirs(child); 441 442 // touch a new file 443 fs.create(file).close(); 444 assertTrue("Test file didn't get created.", fs.exists(file)); 445 446 // run the chore 447 chore.chore(); 448 449 // verify all the files exist 450 assertTrue("File got deleted with cleaner disabled", fs.exists(file)); 451 assertTrue("Directory got deleted", fs.exists(child)); 452 assertTrue("Directory got deleted", fs.exists(parent)); 453 } 454 455 @Test 456 public void testOnConfigurationChange() throws Exception { 457 int availableProcessorNum = Runtime.getRuntime().availableProcessors(); 458 if (availableProcessorNum == 1) { // no need to run this test 459 return; 460 } 461 462 // have at least 2 available processors/cores 463 int initPoolSize = availableProcessorNum / 2; 464 int changedPoolSize = availableProcessorNum; 465 466 Stoppable stop = new StoppableImplementation(); 467 Configuration conf = UTIL.getConfiguration(); 468 Path testDir = UTIL.getDataTestDir(); 469 FileSystem fs = UTIL.getTestFileSystem(); 470 String confKey = "hbase.test.cleaner.delegates"; 471 conf.set(confKey, AlwaysDelete.class.getName()); 472 conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(initPoolSize)); 473 AllValidPaths chore = 474 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 475 chore.setEnabled(true); 476 // Create subdirs under testDir 477 int dirNums = 6; 478 Path[] subdirs = new Path[dirNums]; 479 for (int i = 0; i < dirNums; i++) { 480 subdirs[i] = new Path(testDir, "subdir-" + i); 481 fs.mkdirs(subdirs[i]); 482 } 483 // Under each subdirs create 6 files 484 for (Path subdir : subdirs) { 485 createFiles(fs, subdir, 6); 486 } 487 // Start chore 488 Thread t = new Thread(() -> chore.chore()); 489 t.setDaemon(true); 490 t.start(); 491 // Change size of chore's pool 492 conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(changedPoolSize)); 493 POOL.onConfigurationChange(conf); 494 assertEquals(changedPoolSize, chore.getChorePoolSize()); 495 // Stop chore 496 t.join(); 497 } 498 499 @Test 500 public void testOnConfigurationChangeLogCleaner() throws Exception { 501 int availableProcessorNum = Runtime.getRuntime().availableProcessors(); 502 if (availableProcessorNum == 1) { // no need to run this test 503 return; 504 } 505 506 DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration()); 507 508 // have at least 2 available processors/cores 509 int initPoolSize = availableProcessorNum / 2; 510 int changedPoolSize = availableProcessorNum; 511 512 Stoppable stop = new StoppableImplementation(); 513 Configuration conf = UTIL.getConfiguration(); 514 Path testDir = UTIL.getDataTestDir(); 515 FileSystem fs = UTIL.getTestFileSystem(); 516 String confKey = "hbase.test.cleaner.delegates"; 517 conf.set(confKey, AlwaysDelete.class.getName()); 518 conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize)); 519 final AllValidPaths chore = 520 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool); 521 chore.setEnabled(true); 522 // Create subdirs under testDir 523 int dirNums = 6; 524 Path[] subdirs = new Path[dirNums]; 525 for (int i = 0; i < dirNums; i++) { 526 subdirs[i] = new Path(testDir, "subdir-" + i); 527 fs.mkdirs(subdirs[i]); 528 } 529 // Under each subdirs create 6 files 530 for (Path subdir : subdirs) { 531 createFiles(fs, subdir, 6); 532 } 533 // Start chore 534 Thread t = new Thread(new Runnable() { 535 @Override 536 public void run() { 537 chore.chore(); 538 } 539 }); 540 t.setDaemon(true); 541 t.start(); 542 // Change size of chore's pool 543 conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize)); 544 pool.onConfigurationChange(conf); 545 assertEquals(changedPoolSize, chore.getChorePoolSize()); 546 // Stop chore 547 t.join(); 548 } 549 550 @Test 551 public void testMinimumNumberOfThreads() throws Exception { 552 Configuration conf = UTIL.getConfiguration(); 553 String confKey = "hbase.test.cleaner.delegates"; 554 conf.set(confKey, AlwaysDelete.class.getName()); 555 conf.set(CleanerChore.CHORE_POOL_SIZE, "2"); 556 int numProcs = Runtime.getRuntime().availableProcessors(); 557 // Sanity 558 assertEquals(numProcs, CleanerChore.calculatePoolSize(Integer.toString(numProcs))); 559 // The implementation does not allow us to set more threads than we have processors 560 assertEquals(numProcs, CleanerChore.calculatePoolSize(Integer.toString(numProcs + 2))); 561 // Force us into the branch that is multiplying 0.0 against the number of processors 562 assertEquals(1, CleanerChore.calculatePoolSize("0.0")); 563 } 564 565 @Test 566 public void testTriggerCleaner() throws Exception { 567 Stoppable stop = new StoppableImplementation(); 568 Configuration conf = UTIL.getConfiguration(); 569 Path testDir = UTIL.getDataTestDir(); 570 FileSystem fs = UTIL.getTestFileSystem(); 571 fs.mkdirs(testDir); 572 String confKey = "hbase.test.cleaner.delegates"; 573 conf.set(confKey, AlwaysDelete.class.getName()); 574 final AllValidPaths chore = 575 new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, POOL); 576 try { 577 SERVICE.scheduleChore(chore); 578 assertTrue(chore.triggerCleanerNow().get()); 579 chore.setEnabled(false); 580 // should still runnable 581 assertTrue(chore.triggerCleanerNow().get()); 582 } finally { 583 chore.cancel(); 584 } 585 } 586 587 @Test 588 public void testRescheduleNoConcurrencyRun() throws Exception { 589 Stoppable stop = new StoppableImplementation(); 590 Configuration conf = UTIL.getConfiguration(); 591 Path testDir = UTIL.getDataTestDir(); 592 FileSystem fs = UTIL.getTestFileSystem(); 593 fs.mkdirs(testDir); 594 fs.createNewFile(new Path(testDir, "test")); 595 String confKey = "hbase.test.cleaner.delegates"; 596 conf.set(confKey, GetConcurrency.class.getName()); 597 AtomicInteger maxConcurrency = new AtomicInteger(); 598 final AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, 599 confKey, POOL, ImmutableMap.of("maxConcurrency", maxConcurrency)); 600 try { 601 SERVICE.scheduleChore(chore); 602 for (int i = 0; i < 100; i++) { 603 chore.triggerNow(); 604 Thread.sleep(5 + ThreadLocalRandom.current().nextInt(5)); 605 } 606 Thread.sleep(1000); 607 // set a barrier here to make sure that the previous runs are also finished 608 assertFalse(chore.triggerCleanerNow().get()); 609 // make sure we do not have multiple cleaner runs at the same time 610 assertEquals(1, maxConcurrency.get()); 611 } finally { 612 chore.cancel(); 613 } 614 } 615 616 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 617 for (int i = 0; i < numOfFiles; i++) { 618 int xMega = 1 + ThreadLocalRandom.current().nextInt(3); // size of each file is between 1~3M 619 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 620 for (int m = 0; m < xMega; m++) { 621 byte[] M = new byte[1024 * 1024]; 622 Bytes.random(M); 623 fsdos.write(M); 624 } 625 } 626 } 627 } 628 629 private static class AllValidPaths extends CleanerChore<BaseHFileCleanerDelegate> { 630 631 public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, 632 Path oldFileDir, String confkey, DirScanPool pool) { 633 super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool); 634 } 635 636 public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, 637 Path oldFileDir, String confkey, DirScanPool pool, Map<String, Object> params) { 638 super(name, Integer.MAX_VALUE, s, conf, fs, oldFileDir, confkey, pool, params, null); 639 } 640 641 // all paths are valid 642 @Override 643 protected boolean validate(Path file) { 644 return true; 645 } 646 } 647 648 public static class AlwaysDelete extends BaseHFileCleanerDelegate { 649 @Override 650 public boolean isFileDeletable(FileStatus fStat) { 651 return true; 652 } 653 } 654 655 public static class NeverDelete extends BaseHFileCleanerDelegate { 656 @Override 657 public boolean isFileDeletable(FileStatus fStat) { 658 return false; 659 } 660 } 661 662 public static class GetConcurrency extends BaseHFileCleanerDelegate { 663 664 private final AtomicInteger concurrency = new AtomicInteger(); 665 666 private AtomicInteger maxConcurrency; 667 668 @Override 669 public void init(Map<String, Object> params) { 670 maxConcurrency = (AtomicInteger) params.get("maxConcurrency"); 671 } 672 673 @Override 674 public void preClean() { 675 int c = concurrency.incrementAndGet(); 676 while (true) { 677 int cur = maxConcurrency.get(); 678 if (c <= cur) { 679 break; 680 } 681 682 if (maxConcurrency.compareAndSet(cur, c)) { 683 break; 684 } 685 } 686 } 687 688 @Override 689 public void postClean() { 690 concurrency.decrementAndGet(); 691 } 692 693 @Override 694 protected boolean isFileDeletable(FileStatus fStat) { 695 // sleep a while to slow down the process 696 Threads.sleepWithoutInterrupt(10 + ThreadLocalRandom.current().nextInt(10)); 697 return false; 698 } 699 } 700}