001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.mock; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.Stoppable; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.client.RegionInfoBuilder; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.regionserver.HStore; 048import org.apache.hadoop.hbase.regionserver.HStoreFile; 049import org.apache.hadoop.hbase.regionserver.RegionScanner; 050import org.apache.hadoop.hbase.regionserver.RegionServerServices; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.junit.After; 055import org.junit.Before; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.mockito.Mockito; 060 061@Category({ MediumTests.class, RegionServerTests.class }) 062public class TestCompactedHFilesDischarger { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestCompactedHFilesDischarger.class); 067 068 private final HBaseTestingUtil testUtil = new HBaseTestingUtil(); 069 private HRegion region; 070 private final static byte[] fam = Bytes.toBytes("cf_1"); 071 private final static byte[] qual1 = Bytes.toBytes("qf_1"); 072 private final static byte[] val = Bytes.toBytes("val"); 073 private static CountDownLatch latch = new CountDownLatch(3); 074 private static AtomicInteger counter = new AtomicInteger(0); 075 private static AtomicInteger scanCompletedCounter = new AtomicInteger(0); 076 private RegionServerServices rss; 077 078 @Before 079 public void setUp() throws Exception { 080 TableName tableName = TableName.valueOf(getClass().getSimpleName()); 081 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 082 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build(); 083 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 084 Path path = testUtil.getDataTestDir(getClass().getSimpleName()); 085 region = 086 HBaseTestingUtil.createRegionAndWAL(info, path, testUtil.getConfiguration(), tableDescriptor); 087 rss = mock(RegionServerServices.class); 088 List<HRegion> regions = new ArrayList<>(1); 089 regions.add(region); 090 Mockito.doReturn(regions).when(rss).getRegions(); 091 } 092 093 @After 094 public void tearDown() throws IOException { 095 counter.set(0); 096 scanCompletedCounter.set(0); 097 latch = new CountDownLatch(3); 098 HBaseTestingUtil.closeRegionAndWAL(region); 099 testUtil.cleanupTestDir(); 100 } 101 102 @Test 103 public void testCompactedHFilesCleaner() throws Exception { 104 // Create the cleaner object 105 CompactedHFilesDischarger cleaner = 106 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 107 // Add some data to the region and do some flushes 108 for (int i = 1; i < 10; i++) { 109 Put p = new Put(Bytes.toBytes("row" + i)); 110 p.addColumn(fam, qual1, val); 111 region.put(p); 112 } 113 // flush them 114 region.flush(true); 115 for (int i = 11; i < 20; i++) { 116 Put p = new Put(Bytes.toBytes("row" + i)); 117 p.addColumn(fam, qual1, val); 118 region.put(p); 119 } 120 // flush them 121 region.flush(true); 122 for (int i = 21; i < 30; i++) { 123 Put p = new Put(Bytes.toBytes("row" + i)); 124 p.addColumn(fam, qual1, val); 125 region.put(p); 126 } 127 // flush them 128 region.flush(true); 129 130 HStore store = region.getStore(fam); 131 assertEquals(3, store.getStorefilesCount()); 132 133 Collection<HStoreFile> storefiles = store.getStorefiles(); 134 Collection<HStoreFile> compactedfiles = 135 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 136 // None of the files should be in compacted state. 137 for (HStoreFile file : storefiles) { 138 assertFalse(file.isCompactedAway()); 139 } 140 // Try to run the cleaner without compaction. there should not be any change 141 cleaner.chore(); 142 storefiles = store.getStorefiles(); 143 // None of the files should be in compacted state. 144 for (HStoreFile file : storefiles) { 145 assertFalse(file.isCompactedAway()); 146 } 147 // now do some compaction 148 region.compact(true); 149 // Still the flushed files should be present until the cleaner runs. But the state of it should 150 // be in COMPACTED state 151 assertEquals(1, store.getStorefilesCount()); 152 assertEquals(3, 153 ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 154 155 // Run the cleaner 156 cleaner.chore(); 157 assertEquals(1, store.getStorefilesCount()); 158 storefiles = store.getStorefiles(); 159 for (HStoreFile file : storefiles) { 160 // Should not be in compacted state 161 assertFalse(file.isCompactedAway()); 162 } 163 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 164 assertTrue(compactedfiles.isEmpty()); 165 166 } 167 168 @Test 169 public void testCleanerWithParallelScannersAfterCompaction() throws Exception { 170 // Create the cleaner object 171 CompactedHFilesDischarger cleaner = 172 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 173 // Add some data to the region and do some flushes 174 for (int i = 1; i < 10; i++) { 175 Put p = new Put(Bytes.toBytes("row" + i)); 176 p.addColumn(fam, qual1, val); 177 region.put(p); 178 } 179 // flush them 180 region.flush(true); 181 for (int i = 11; i < 20; i++) { 182 Put p = new Put(Bytes.toBytes("row" + i)); 183 p.addColumn(fam, qual1, val); 184 region.put(p); 185 } 186 // flush them 187 region.flush(true); 188 for (int i = 21; i < 30; i++) { 189 Put p = new Put(Bytes.toBytes("row" + i)); 190 p.addColumn(fam, qual1, val); 191 region.put(p); 192 } 193 // flush them 194 region.flush(true); 195 196 HStore store = region.getStore(fam); 197 assertEquals(3, store.getStorefilesCount()); 198 199 Collection<HStoreFile> storefiles = store.getStorefiles(); 200 Collection<HStoreFile> compactedfiles = 201 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 202 // None of the files should be in compacted state. 203 for (HStoreFile file : storefiles) { 204 assertFalse(file.isCompactedAway()); 205 } 206 // Do compaction 207 region.compact(true); 208 startScannerThreads(); 209 210 storefiles = store.getStorefiles(); 211 int usedReaderCount = 0; 212 int unusedReaderCount = 0; 213 for (HStoreFile file : storefiles) { 214 if (((HStoreFile) file).getRefCount() == 3) { 215 usedReaderCount++; 216 } 217 } 218 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 219 for (HStoreFile file : compactedfiles) { 220 assertEquals("Refcount should be 3", 0, ((HStoreFile) file).getRefCount()); 221 unusedReaderCount++; 222 } 223 // Though there are files we are not using them for reads 224 assertEquals("unused reader count should be 3", 3, unusedReaderCount); 225 assertEquals("used reader count should be 1", 1, usedReaderCount); 226 // now run the cleaner 227 cleaner.chore(); 228 countDown(); 229 assertEquals(1, store.getStorefilesCount()); 230 storefiles = store.getStorefiles(); 231 for (HStoreFile file : storefiles) { 232 // Should not be in compacted state 233 assertFalse(file.isCompactedAway()); 234 } 235 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 236 assertTrue(compactedfiles.isEmpty()); 237 } 238 239 @Test 240 public void testCleanerWithParallelScanners() throws Exception { 241 // Create the cleaner object 242 CompactedHFilesDischarger cleaner = 243 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 244 // Add some data to the region and do some flushes 245 for (int i = 1; i < 10; i++) { 246 Put p = new Put(Bytes.toBytes("row" + i)); 247 p.addColumn(fam, qual1, val); 248 region.put(p); 249 } 250 // flush them 251 region.flush(true); 252 for (int i = 11; i < 20; i++) { 253 Put p = new Put(Bytes.toBytes("row" + i)); 254 p.addColumn(fam, qual1, val); 255 region.put(p); 256 } 257 // flush them 258 region.flush(true); 259 for (int i = 21; i < 30; i++) { 260 Put p = new Put(Bytes.toBytes("row" + i)); 261 p.addColumn(fam, qual1, val); 262 region.put(p); 263 } 264 // flush them 265 region.flush(true); 266 267 HStore store = region.getStore(fam); 268 assertEquals(3, store.getStorefilesCount()); 269 270 Collection<HStoreFile> storefiles = store.getStorefiles(); 271 Collection<HStoreFile> compactedfiles = 272 store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 273 // None of the files should be in compacted state. 274 for (HStoreFile file : storefiles) { 275 assertFalse(file.isCompactedAway()); 276 } 277 startScannerThreads(); 278 // Do compaction 279 region.compact(true); 280 281 storefiles = store.getStorefiles(); 282 int usedReaderCount = 0; 283 int unusedReaderCount = 0; 284 for (HStoreFile file : storefiles) { 285 if (file.getRefCount() == 0) { 286 unusedReaderCount++; 287 } 288 } 289 compactedfiles = store.getStoreEngine().getStoreFileManager().getCompactedfiles(); 290 for (HStoreFile file : compactedfiles) { 291 assertEquals("Refcount should be 3", 3, ((HStoreFile) file).getRefCount()); 292 usedReaderCount++; 293 } 294 // The newly compacted file will not be used by any scanner 295 assertEquals("unused reader count should be 1", 1, unusedReaderCount); 296 assertEquals("used reader count should be 3", 3, usedReaderCount); 297 // now run the cleaner 298 cleaner.chore(); 299 countDown(); 300 // No change in the number of store files as none of the compacted files could be cleaned up 301 assertEquals(1, store.getStorefilesCount()); 302 assertEquals(3, 303 ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size()); 304 while (scanCompletedCounter.get() != 3) { 305 Thread.sleep(100); 306 } 307 // reset 308 latch = new CountDownLatch(3); 309 scanCompletedCounter.set(0); 310 counter.set(0); 311 // Try creating a new scanner and it should use only the new file created after compaction 312 startScannerThreads(); 313 storefiles = store.getStorefiles(); 314 usedReaderCount = 0; 315 unusedReaderCount = 0; 316 for (HStoreFile file : storefiles) { 317 if (file.getRefCount() == 3) { 318 usedReaderCount++; 319 } 320 } 321 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 322 for (HStoreFile file : compactedfiles) { 323 assertEquals("Refcount should be 0", 0, file.getRefCount()); 324 unusedReaderCount++; 325 } 326 // Though there are files we are not using them for reads 327 assertEquals("unused reader count should be 3", 3, unusedReaderCount); 328 assertEquals("used reader count should be 1", 1, usedReaderCount); 329 countDown(); 330 while (scanCompletedCounter.get() != 3) { 331 Thread.sleep(100); 332 } 333 // Run the cleaner again 334 cleaner.chore(); 335 // Now the cleaner should be able to clear it up because there are no active readers 336 assertEquals(1, store.getStorefilesCount()); 337 storefiles = store.getStorefiles(); 338 for (HStoreFile file : storefiles) { 339 // Should not be in compacted state 340 assertFalse(file.isCompactedAway()); 341 } 342 compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); 343 assertTrue(compactedfiles.isEmpty()); 344 } 345 346 @Test 347 public void testStoreFileMissing() throws Exception { 348 // Write 3 records and create 3 store files. 349 write("row1"); 350 region.flush(true); 351 write("row2"); 352 region.flush(true); 353 write("row3"); 354 region.flush(true); 355 356 Scan scan = new Scan(); 357 scan.setCaching(1); 358 RegionScanner scanner = region.getScanner(scan); 359 List<Cell> res = new ArrayList<Cell>(); 360 // Read first item 361 scanner.next(res); 362 assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0)))); 363 res.clear(); 364 // Create a new file in between scan nexts 365 write("row4"); 366 region.flush(true); 367 368 // Compact the table 369 region.compact(true); 370 371 // Create the cleaner object 372 CompactedHFilesDischarger cleaner = 373 new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); 374 cleaner.chore(); 375 // This issues scan next 376 scanner.next(res); 377 assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0)))); 378 379 scanner.close(); 380 } 381 382 private void write(String row1) throws IOException { 383 byte[] row = Bytes.toBytes(row1); 384 Put put = new Put(row); 385 put.addColumn(fam, qual1, row); 386 region.put(put); 387 } 388 389 protected void countDown() { 390 // count down 3 times 391 latch.countDown(); 392 latch.countDown(); 393 latch.countDown(); 394 } 395 396 protected void startScannerThreads() throws InterruptedException { 397 // Start parallel scan threads 398 ScanThread[] scanThreads = new ScanThread[3]; 399 for (int i = 0; i < 3; i++) { 400 scanThreads[i] = new ScanThread((HRegion) region); 401 } 402 for (ScanThread thread : scanThreads) { 403 thread.start(); 404 } 405 while (counter.get() != 3) { 406 Thread.sleep(100); 407 } 408 } 409 410 private static class ScanThread extends Thread { 411 private final HRegion region; 412 413 public ScanThread(HRegion region) { 414 this.region = region; 415 } 416 417 @Override 418 public void run() { 419 try { 420 initiateScan(region); 421 } catch (IOException e) { 422 e.printStackTrace(); 423 } 424 } 425 426 private void initiateScan(HRegion region) throws IOException { 427 Scan scan = new Scan(); 428 scan.setCaching(1); 429 RegionScanner resScanner = null; 430 try { 431 resScanner = region.getScanner(scan); 432 List<Cell> results = new ArrayList<>(); 433 boolean next = resScanner.next(results); 434 try { 435 counter.incrementAndGet(); 436 latch.await(); 437 } catch (InterruptedException e) { 438 } 439 while (next) { 440 next = resScanner.next(results); 441 } 442 } finally { 443 scanCompletedCounter.incrementAndGet(); 444 resScanner.close(); 445 } 446 } 447 } 448}