001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.compactions;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.mock;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.Stoppable;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionInfoBuilder;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.regionserver.HStore;
048import org.apache.hadoop.hbase.regionserver.HStoreFile;
049import org.apache.hadoop.hbase.regionserver.RegionScanner;
050import org.apache.hadoop.hbase.regionserver.RegionServerServices;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.testclassification.RegionServerTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.junit.After;
055import org.junit.Before;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.mockito.Mockito;
060
061@Category({ MediumTests.class, RegionServerTests.class })
062public class TestCompactedHFilesDischarger {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066    HBaseClassTestRule.forClass(TestCompactedHFilesDischarger.class);
067
068  private final HBaseTestingUtil testUtil = new HBaseTestingUtil();
069  private HRegion region;
070  private final static byte[] fam = Bytes.toBytes("cf_1");
071  private final static byte[] qual1 = Bytes.toBytes("qf_1");
072  private final static byte[] val = Bytes.toBytes("val");
073  private static CountDownLatch latch = new CountDownLatch(3);
074  private static AtomicInteger counter = new AtomicInteger(0);
075  private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
076  private RegionServerServices rss;
077
078  @Before
079  public void setUp() throws Exception {
080    TableName tableName = TableName.valueOf(getClass().getSimpleName());
081    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
082      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
083    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
084    Path path = testUtil.getDataTestDir(getClass().getSimpleName());
085    region =
086      HBaseTestingUtil.createRegionAndWAL(info, path, testUtil.getConfiguration(), tableDescriptor);
087    rss = mock(RegionServerServices.class);
088    List<HRegion> regions = new ArrayList<>(1);
089    regions.add(region);
090    Mockito.doReturn(regions).when(rss).getRegions();
091  }
092
093  @After
094  public void tearDown() throws IOException {
095    counter.set(0);
096    scanCompletedCounter.set(0);
097    latch = new CountDownLatch(3);
098    HBaseTestingUtil.closeRegionAndWAL(region);
099    testUtil.cleanupTestDir();
100  }
101
102  @Test
103  public void testCompactedHFilesCleaner() throws Exception {
104    // Create the cleaner object
105    CompactedHFilesDischarger cleaner =
106      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
107    // Add some data to the region and do some flushes
108    for (int i = 1; i < 10; i++) {
109      Put p = new Put(Bytes.toBytes("row" + i));
110      p.addColumn(fam, qual1, val);
111      region.put(p);
112    }
113    // flush them
114    region.flush(true);
115    for (int i = 11; i < 20; i++) {
116      Put p = new Put(Bytes.toBytes("row" + i));
117      p.addColumn(fam, qual1, val);
118      region.put(p);
119    }
120    // flush them
121    region.flush(true);
122    for (int i = 21; i < 30; i++) {
123      Put p = new Put(Bytes.toBytes("row" + i));
124      p.addColumn(fam, qual1, val);
125      region.put(p);
126    }
127    // flush them
128    region.flush(true);
129
130    HStore store = region.getStore(fam);
131    assertEquals(3, store.getStorefilesCount());
132
133    Collection<HStoreFile> storefiles = store.getStorefiles();
134    Collection<HStoreFile> compactedfiles =
135      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
136    // None of the files should be in compacted state.
137    for (HStoreFile file : storefiles) {
138      assertFalse(file.isCompactedAway());
139    }
140    // Try to run the cleaner without compaction. there should not be any change
141    cleaner.chore();
142    storefiles = store.getStorefiles();
143    // None of the files should be in compacted state.
144    for (HStoreFile file : storefiles) {
145      assertFalse(file.isCompactedAway());
146    }
147    // now do some compaction
148    region.compact(true);
149    // Still the flushed files should be present until the cleaner runs. But the state of it should
150    // be in COMPACTED state
151    assertEquals(1, store.getStorefilesCount());
152    assertEquals(3,
153      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
154
155    // Run the cleaner
156    cleaner.chore();
157    assertEquals(1, store.getStorefilesCount());
158    storefiles = store.getStorefiles();
159    for (HStoreFile file : storefiles) {
160      // Should not be in compacted state
161      assertFalse(file.isCompactedAway());
162    }
163    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
164    assertTrue(compactedfiles.isEmpty());
165
166  }
167
168  @Test
169  public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
170    // Create the cleaner object
171    CompactedHFilesDischarger cleaner =
172      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
173    // Add some data to the region and do some flushes
174    for (int i = 1; i < 10; i++) {
175      Put p = new Put(Bytes.toBytes("row" + i));
176      p.addColumn(fam, qual1, val);
177      region.put(p);
178    }
179    // flush them
180    region.flush(true);
181    for (int i = 11; i < 20; i++) {
182      Put p = new Put(Bytes.toBytes("row" + i));
183      p.addColumn(fam, qual1, val);
184      region.put(p);
185    }
186    // flush them
187    region.flush(true);
188    for (int i = 21; i < 30; i++) {
189      Put p = new Put(Bytes.toBytes("row" + i));
190      p.addColumn(fam, qual1, val);
191      region.put(p);
192    }
193    // flush them
194    region.flush(true);
195
196    HStore store = region.getStore(fam);
197    assertEquals(3, store.getStorefilesCount());
198
199    Collection<HStoreFile> storefiles = store.getStorefiles();
200    Collection<HStoreFile> compactedfiles =
201      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
202    // None of the files should be in compacted state.
203    for (HStoreFile file : storefiles) {
204      assertFalse(file.isCompactedAway());
205    }
206    // Do compaction
207    region.compact(true);
208    startScannerThreads();
209
210    storefiles = store.getStorefiles();
211    int usedReaderCount = 0;
212    int unusedReaderCount = 0;
213    for (HStoreFile file : storefiles) {
214      if (((HStoreFile) file).getRefCount() == 3) {
215        usedReaderCount++;
216      }
217    }
218    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
219    for (HStoreFile file : compactedfiles) {
220      assertEquals("Refcount should be 3", 0, ((HStoreFile) file).getRefCount());
221      unusedReaderCount++;
222    }
223    // Though there are files we are not using them for reads
224    assertEquals("unused reader count should be 3", 3, unusedReaderCount);
225    assertEquals("used reader count should be 1", 1, usedReaderCount);
226    // now run the cleaner
227    cleaner.chore();
228    countDown();
229    assertEquals(1, store.getStorefilesCount());
230    storefiles = store.getStorefiles();
231    for (HStoreFile file : storefiles) {
232      // Should not be in compacted state
233      assertFalse(file.isCompactedAway());
234    }
235    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
236    assertTrue(compactedfiles.isEmpty());
237  }
238
239  @Test
240  public void testCleanerWithParallelScanners() throws Exception {
241    // Create the cleaner object
242    CompactedHFilesDischarger cleaner =
243      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
244    // Add some data to the region and do some flushes
245    for (int i = 1; i < 10; i++) {
246      Put p = new Put(Bytes.toBytes("row" + i));
247      p.addColumn(fam, qual1, val);
248      region.put(p);
249    }
250    // flush them
251    region.flush(true);
252    for (int i = 11; i < 20; i++) {
253      Put p = new Put(Bytes.toBytes("row" + i));
254      p.addColumn(fam, qual1, val);
255      region.put(p);
256    }
257    // flush them
258    region.flush(true);
259    for (int i = 21; i < 30; i++) {
260      Put p = new Put(Bytes.toBytes("row" + i));
261      p.addColumn(fam, qual1, val);
262      region.put(p);
263    }
264    // flush them
265    region.flush(true);
266
267    HStore store = region.getStore(fam);
268    assertEquals(3, store.getStorefilesCount());
269
270    Collection<HStoreFile> storefiles = store.getStorefiles();
271    Collection<HStoreFile> compactedfiles =
272      store.getStoreEngine().getStoreFileManager().getCompactedfiles();
273    // None of the files should be in compacted state.
274    for (HStoreFile file : storefiles) {
275      assertFalse(file.isCompactedAway());
276    }
277    startScannerThreads();
278    // Do compaction
279    region.compact(true);
280
281    storefiles = store.getStorefiles();
282    int usedReaderCount = 0;
283    int unusedReaderCount = 0;
284    for (HStoreFile file : storefiles) {
285      if (file.getRefCount() == 0) {
286        unusedReaderCount++;
287      }
288    }
289    compactedfiles = store.getStoreEngine().getStoreFileManager().getCompactedfiles();
290    for (HStoreFile file : compactedfiles) {
291      assertEquals("Refcount should be 3", 3, ((HStoreFile) file).getRefCount());
292      usedReaderCount++;
293    }
294    // The newly compacted file will not be used by any scanner
295    assertEquals("unused reader count should be 1", 1, unusedReaderCount);
296    assertEquals("used reader count should be 3", 3, usedReaderCount);
297    // now run the cleaner
298    cleaner.chore();
299    countDown();
300    // No change in the number of store files as none of the compacted files could be cleaned up
301    assertEquals(1, store.getStorefilesCount());
302    assertEquals(3,
303      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
304    while (scanCompletedCounter.get() != 3) {
305      Thread.sleep(100);
306    }
307    // reset
308    latch = new CountDownLatch(3);
309    scanCompletedCounter.set(0);
310    counter.set(0);
311    // Try creating a new scanner and it should use only the new file created after compaction
312    startScannerThreads();
313    storefiles = store.getStorefiles();
314    usedReaderCount = 0;
315    unusedReaderCount = 0;
316    for (HStoreFile file : storefiles) {
317      if (file.getRefCount() == 3) {
318        usedReaderCount++;
319      }
320    }
321    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
322    for (HStoreFile file : compactedfiles) {
323      assertEquals("Refcount should be 0", 0, file.getRefCount());
324      unusedReaderCount++;
325    }
326    // Though there are files we are not using them for reads
327    assertEquals("unused reader count should be 3", 3, unusedReaderCount);
328    assertEquals("used reader count should be 1", 1, usedReaderCount);
329    countDown();
330    while (scanCompletedCounter.get() != 3) {
331      Thread.sleep(100);
332    }
333    // Run the cleaner again
334    cleaner.chore();
335    // Now the cleaner should be able to clear it up because there are no active readers
336    assertEquals(1, store.getStorefilesCount());
337    storefiles = store.getStorefiles();
338    for (HStoreFile file : storefiles) {
339      // Should not be in compacted state
340      assertFalse(file.isCompactedAway());
341    }
342    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
343    assertTrue(compactedfiles.isEmpty());
344  }
345
346  @Test
347  public void testStoreFileMissing() throws Exception {
348    // Write 3 records and create 3 store files.
349    write("row1");
350    region.flush(true);
351    write("row2");
352    region.flush(true);
353    write("row3");
354    region.flush(true);
355
356    Scan scan = new Scan();
357    scan.setCaching(1);
358    RegionScanner scanner = region.getScanner(scan);
359    List<Cell> res = new ArrayList<Cell>();
360    // Read first item
361    scanner.next(res);
362    assertEquals("row1", Bytes.toString(CellUtil.cloneRow(res.get(0))));
363    res.clear();
364    // Create a new file in between scan nexts
365    write("row4");
366    region.flush(true);
367
368    // Compact the table
369    region.compact(true);
370
371    // Create the cleaner object
372    CompactedHFilesDischarger cleaner =
373      new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
374    cleaner.chore();
375    // This issues scan next
376    scanner.next(res);
377    assertEquals("row2", Bytes.toString(CellUtil.cloneRow(res.get(0))));
378
379    scanner.close();
380  }
381
382  private void write(String row1) throws IOException {
383    byte[] row = Bytes.toBytes(row1);
384    Put put = new Put(row);
385    put.addColumn(fam, qual1, row);
386    region.put(put);
387  }
388
389  protected void countDown() {
390    // count down 3 times
391    latch.countDown();
392    latch.countDown();
393    latch.countDown();
394  }
395
396  protected void startScannerThreads() throws InterruptedException {
397    // Start parallel scan threads
398    ScanThread[] scanThreads = new ScanThread[3];
399    for (int i = 0; i < 3; i++) {
400      scanThreads[i] = new ScanThread((HRegion) region);
401    }
402    for (ScanThread thread : scanThreads) {
403      thread.start();
404    }
405    while (counter.get() != 3) {
406      Thread.sleep(100);
407    }
408  }
409
410  private static class ScanThread extends Thread {
411    private final HRegion region;
412
413    public ScanThread(HRegion region) {
414      this.region = region;
415    }
416
417    @Override
418    public void run() {
419      try {
420        initiateScan(region);
421      } catch (IOException e) {
422        e.printStackTrace();
423      }
424    }
425
426    private void initiateScan(HRegion region) throws IOException {
427      Scan scan = new Scan();
428      scan.setCaching(1);
429      RegionScanner resScanner = null;
430      try {
431        resScanner = region.getScanner(scan);
432        List<Cell> results = new ArrayList<>();
433        boolean next = resScanner.next(results);
434        try {
435          counter.incrementAndGet();
436          latch.await();
437        } catch (InterruptedException e) {
438        }
439        while (next) {
440          next = resScanner.next(results);
441        }
442      } finally {
443        scanCompletedCounter.incrementAndGet();
444        resScanner.close();
445      }
446    }
447  }
448}