001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
026import static org.junit.Assert.assertEquals;
027import static org.junit.Assert.assertFalse;
028import static org.junit.Assert.assertThrows;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.Mockito.doAnswer;
033import static org.mockito.Mockito.mock;
034import static org.mockito.Mockito.spy;
035import static org.mockito.Mockito.when;
036
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.Collection;
040import java.util.Collections;
041import java.util.List;
042import java.util.Optional;
043import java.util.concurrent.CountDownLatch;
044import java.util.concurrent.TimeUnit;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FSDataOutputStream;
047import org.apache.hadoop.fs.FileStatus;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.ChoreService;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HColumnDescriptor;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HTableDescriptor;
057import org.apache.hadoop.hbase.HTestConst;
058import org.apache.hadoop.hbase.Waiter;
059import org.apache.hadoop.hbase.client.Delete;
060import org.apache.hadoop.hbase.client.Durability;
061import org.apache.hadoop.hbase.client.Put;
062import org.apache.hadoop.hbase.client.Table;
063import org.apache.hadoop.hbase.io.hfile.HFileScanner;
064import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
065import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
066import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
067import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
068import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
069import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
070import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
071import org.apache.hadoop.hbase.security.User;
072import org.apache.hadoop.hbase.testclassification.MediumTests;
073import org.apache.hadoop.hbase.testclassification.RegionServerTests;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.Threads;
077import org.apache.hadoop.hbase.wal.WAL;
078import org.junit.After;
079import org.junit.Assume;
080import org.junit.Before;
081import org.junit.ClassRule;
082import org.junit.Rule;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.junit.rules.TestName;
086import org.mockito.Mockito;
087import org.mockito.invocation.InvocationOnMock;
088import org.mockito.stubbing.Answer;
089
090/**
091 * Test compaction framework and common functions
092 */
093@Category({ RegionServerTests.class, MediumTests.class })
094public class TestCompaction {
095
096  @ClassRule
097  public static final HBaseClassTestRule CLASS_RULE =
098    HBaseClassTestRule.forClass(TestCompaction.class);
099
100  @Rule
101  public TestName name = new TestName();
102  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
103  protected Configuration conf = UTIL.getConfiguration();
104
105  private HRegion r = null;
106  private HTableDescriptor htd = null;
107  private static final byte[] COLUMN_FAMILY = fam1;
108  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
109  private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
110  private int compactionThreshold;
111  private byte[] secondRowBytes, thirdRowBytes;
112  private static final long MAX_FILES_TO_COMPACT = 10;
113  private final byte[] FAMILY = Bytes.toBytes("cf");
114
115  /** constructor */
116  public TestCompaction() {
117    super();
118
119    // Set cache flush size to 1MB
120    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
121    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
122    conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L);
123    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
124      NoLimitThroughputController.class.getName());
125    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
126
127    secondRowBytes = START_KEY_BYTES.clone();
128    // Increment the least significant character so we get to next row.
129    secondRowBytes[START_KEY_BYTES.length - 1]++;
130    thirdRowBytes = START_KEY_BYTES.clone();
131    thirdRowBytes[START_KEY_BYTES.length - 1] =
132      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
133  }
134
135  @Before
136  public void setUp() throws Exception {
137    this.htd = UTIL.createTableDescriptor(name.getMethodName());
138    if (name.getMethodName().equals("testCompactionSeqId")) {
139      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
140      UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
141        DummyCompactor.class.getName());
142      HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
143      hcd.setMaxVersions(65536);
144      this.htd.addFamily(hcd);
145    }
146    this.r = UTIL.createLocalHRegion(htd, null, null);
147  }
148
149  @After
150  public void tearDown() throws Exception {
151    WAL wal = r.getWAL();
152    this.r.close();
153    wal.close();
154  }
155
156  /**
157   * Verify that you can stop a long-running compaction (used during RS shutdown)
158   */
159  @Test
160  public void testInterruptCompactionBySize() throws Exception {
161    assertEquals(0, count());
162
163    // lower the polling interval for this test
164    conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
165
166    try {
167      // Create a couple store files w/ 15KB (over 10KB interval)
168      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
169      byte[] pad = new byte[1000]; // 1 KB chunk
170      for (int i = 0; i < compactionThreshold; i++) {
171        Table loader = new RegionAsTable(r);
172        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
173        p.setDurability(Durability.SKIP_WAL);
174        for (int j = 0; j < jmax; j++) {
175          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
176        }
177        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
178        loader.put(p);
179        r.flush(true);
180      }
181
182      HRegion spyR = spy(r);
183      doAnswer(new Answer<Object>() {
184        @Override
185        public Object answer(InvocationOnMock invocation) throws Throwable {
186          r.writestate.writesEnabled = false;
187          return invocation.callRealMethod();
188        }
189      }).when(spyR).doRegionCompactionPrep();
190
191      // force a minor compaction, but not before requesting a stop
192      spyR.compactStores();
193
194      // ensure that the compaction stopped, all old files are intact,
195      HStore s = r.getStore(COLUMN_FAMILY);
196      assertEquals(compactionThreshold, s.getStorefilesCount());
197      assertTrue(s.getStorefilesSize() > 15 * 1000);
198      // and no new store files persisted past compactStores()
199      // only one empty dir exists in temp dir
200      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
201      assertEquals(1, ls.length);
202      Path storeTempDir =
203        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
204      assertTrue(r.getFilesystem().exists(storeTempDir));
205      ls = r.getFilesystem().listStatus(storeTempDir);
206      assertEquals(0, ls.length);
207    } finally {
208      // don't mess up future tests
209      r.writestate.writesEnabled = true;
210      conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
211
212      // Delete all Store information once done using
213      for (int i = 0; i < compactionThreshold; i++) {
214        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
215        byte[][] famAndQf = { COLUMN_FAMILY, null };
216        delete.addFamily(famAndQf[0]);
217        r.delete(delete);
218      }
219      r.flush(true);
220
221      // Multiple versions allowed for an entry, so the delete isn't enough
222      // Lower TTL and expire to ensure that all our entries have been wiped
223      final int ttl = 1000;
224      for (HStore store : this.r.stores.values()) {
225        ScanInfo old = store.getScanInfo();
226        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
227        store.setScanInfo(si);
228      }
229      Thread.sleep(ttl);
230
231      r.compact(true);
232      assertEquals(0, count());
233    }
234  }
235
236  @Test
237  public void testInterruptCompactionByTime() throws Exception {
238    assertEquals(0, count());
239
240    // lower the polling interval for this test
241    conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
242
243    try {
244      // Create a couple store files w/ 15KB (over 10KB interval)
245      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
246      byte[] pad = new byte[1000]; // 1 KB chunk
247      for (int i = 0; i < compactionThreshold; i++) {
248        Table loader = new RegionAsTable(r);
249        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
250        p.setDurability(Durability.SKIP_WAL);
251        for (int j = 0; j < jmax; j++) {
252          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
253        }
254        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
255        loader.put(p);
256        r.flush(true);
257      }
258
259      HRegion spyR = spy(r);
260      doAnswer(new Answer() {
261        @Override
262        public Object answer(InvocationOnMock invocation) throws Throwable {
263          r.writestate.writesEnabled = false;
264          return invocation.callRealMethod();
265        }
266      }).when(spyR).doRegionCompactionPrep();
267
268      // force a minor compaction, but not before requesting a stop
269      spyR.compactStores();
270
271      // ensure that the compaction stopped, all old files are intact,
272      HStore s = r.getStore(COLUMN_FAMILY);
273      assertEquals(compactionThreshold, s.getStorefilesCount());
274      assertTrue(s.getStorefilesSize() > 15 * 1000);
275      // and no new store files persisted past compactStores()
276      // only one empty dir exists in temp dir
277      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
278      assertEquals(1, ls.length);
279      Path storeTempDir =
280        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
281      assertTrue(r.getFilesystem().exists(storeTempDir));
282      ls = r.getFilesystem().listStatus(storeTempDir);
283      assertEquals(0, ls.length);
284    } finally {
285      // don't mess up future tests
286      r.writestate.writesEnabled = true;
287      conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
288
289      // Delete all Store information once done using
290      for (int i = 0; i < compactionThreshold; i++) {
291        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
292        byte[][] famAndQf = { COLUMN_FAMILY, null };
293        delete.addFamily(famAndQf[0]);
294        r.delete(delete);
295      }
296      r.flush(true);
297
298      // Multiple versions allowed for an entry, so the delete isn't enough
299      // Lower TTL and expire to ensure that all our entries have been wiped
300      final int ttl = 1000;
301      for (HStore store : this.r.stores.values()) {
302        ScanInfo old = store.getScanInfo();
303        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
304        store.setScanInfo(si);
305      }
306      Thread.sleep(ttl);
307
308      r.compact(true);
309      assertEquals(0, count());
310    }
311  }
312
313  private int count() throws IOException {
314    int count = 0;
315    for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
316      HFileScanner scanner = f.getReader().getScanner(false, false);
317      if (!scanner.seekTo()) {
318        continue;
319      }
320      do {
321        count++;
322      } while (scanner.next());
323    }
324    return count;
325  }
326
327  private void createStoreFile(final HRegion region) throws IOException {
328    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
329  }
330
331  private void createStoreFile(final HRegion region, String family) throws IOException {
332    Table loader = new RegionAsTable(region);
333    HTestConst.addContent(loader, family);
334    region.flush(true);
335  }
336
337  @Test
338  public void testCompactionWithCorruptResult() throws Exception {
339    int nfiles = 10;
340    for (int i = 0; i < nfiles; i++) {
341      createStoreFile(r);
342    }
343    HStore store = r.getStore(COLUMN_FAMILY);
344
345    Collection<HStoreFile> storeFiles = store.getStorefiles();
346    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
347    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
348    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
349
350    // Now lets corrupt the compacted file.
351    FileSystem fs = store.getFileSystem();
352    // default compaction policy created one and only one new compacted file
353    Path tmpPath = store.getRegionFileSystem().createTempName();
354    try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
355      stream.writeChars("CORRUPT FILE!!!!");
356    }
357    // The complete compaction should fail and the corrupt file should remain
358    // in the 'tmp' directory;
359    assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
360      EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
361    assertTrue(fs.exists(tmpPath));
362  }
363
364  /**
365   * Create a custom compaction request and be sure that we can track it through the queue, knowing
366   * when the compaction is completed.
367   */
368  @Test
369  public void testTrackingCompactionRequest() throws Exception {
370    // setup a compact/split thread on a mock server
371    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
372    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
373    CompactSplit thread = new CompactSplit(mockServer);
374    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
375
376    // setup a region/store with some files
377    HStore store = r.getStore(COLUMN_FAMILY);
378    createStoreFile(r);
379    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
380      createStoreFile(r);
381    }
382
383    CountDownLatch latch = new CountDownLatch(1);
384    Tracker tracker = new Tracker(latch);
385    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null);
386    // wait for the latch to complete.
387    latch.await();
388
389    thread.interruptIfNecessary();
390  }
391
392  @Test
393  public void testCompactionFailure() throws Exception {
394    // setup a compact/split thread on a mock server
395    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
396    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
397    CompactSplit thread = new CompactSplit(mockServer);
398    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
399
400    // setup a region/store with some files
401    HStore store = r.getStore(COLUMN_FAMILY);
402    createStoreFile(r);
403    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
404      createStoreFile(r);
405    }
406
407    HRegion mockRegion = Mockito.spy(r);
408    Mockito.when(mockRegion.checkSplit())
409      .thenThrow(new RuntimeException("Thrown intentionally by test!"));
410
411    try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {
412
413      long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
414      long preFailedCount = metricsWrapper.getNumCompactionsFailed();
415
416      CountDownLatch latch = new CountDownLatch(1);
417      Tracker tracker = new Tracker(latch);
418      thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
419        null);
420      // wait for the latch to complete.
421      latch.await(120, TimeUnit.SECONDS);
422
423      // compaction should have completed and been marked as failed due to error in split request
424      long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
425      long postFailedCount = metricsWrapper.getNumCompactionsFailed();
426
427      assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post="
428        + postCompletedCount + ")", postCompletedCount > preCompletedCount);
429      assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post="
430        + postFailedCount + ")", postFailedCount > preFailedCount);
431    }
432  }
433
434  /**
435   * Test no new Compaction requests are generated after calling stop compactions
436   */
437  @Test
438  public void testStopStartCompaction() throws IOException {
439    // setup a compact/split thread on a mock server
440    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
441    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
442    final CompactSplit thread = new CompactSplit(mockServer);
443    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
444    // setup a region/store with some files
445    HStore store = r.getStore(COLUMN_FAMILY);
446    createStoreFile(r);
447    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
448      createStoreFile(r);
449    }
450    thread.switchCompaction(false);
451    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
452      CompactionLifeCycleTracker.DUMMY, null);
453    assertFalse(thread.isCompactionsEnabled());
454    int longCompactions = thread.getLongCompactions().getActiveCount();
455    int shortCompactions = thread.getShortCompactions().getActiveCount();
456    assertEquals(
457      "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0,
458      longCompactions + shortCompactions);
459    thread.switchCompaction(true);
460    assertTrue(thread.isCompactionsEnabled());
461    // Make sure no compactions have run.
462    assertEquals(0, thread.getLongCompactions().getCompletedTaskCount()
463      + thread.getShortCompactions().getCompletedTaskCount());
464    // Request a compaction and make sure it is submitted successfully.
465    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
466      CompactionLifeCycleTracker.DUMMY, null);
467    // Wait until the compaction finishes.
468    Waiter.waitFor(UTIL.getConfiguration(), 5000,
469      (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount()
470        + thread.getShortCompactions().getCompletedTaskCount() == 1);
471    // Make sure there are no compactions running.
472    assertEquals(0,
473      thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount());
474  }
475
476  @Test
477  public void testInterruptingRunningCompactions() throws Exception {
478    // setup a compact/split thread on a mock server
479    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
480      WaitThroughPutController.class.getName());
481    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
482    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
483    CompactSplit thread = new CompactSplit(mockServer);
484
485    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
486
487    // setup a region/store with some files
488    HStore store = r.getStore(COLUMN_FAMILY);
489    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
490    byte[] pad = new byte[1000]; // 1 KB chunk
491    for (int i = 0; i < compactionThreshold; i++) {
492      Table loader = new RegionAsTable(r);
493      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
494      p.setDurability(Durability.SKIP_WAL);
495      for (int j = 0; j < jmax; j++) {
496        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
497      }
498      HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
499      loader.put(p);
500      r.flush(true);
501    }
502    HStore s = r.getStore(COLUMN_FAMILY);
503    int initialFiles = s.getStorefilesCount();
504
505    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
506      CompactionLifeCycleTracker.DUMMY, null);
507
508    Thread.sleep(3000);
509    thread.switchCompaction(false);
510    assertEquals(initialFiles, s.getStorefilesCount());
511    // don't mess up future tests
512    thread.switchCompaction(true);
513  }
514
515  /**
516   * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit}
517   * @throws Exception on failure
518   */
519  @Test
520  public void testMultipleCustomCompactionRequests() throws Exception {
521    // setup a compact/split thread on a mock server
522    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
523    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
524    CompactSplit thread = new CompactSplit(mockServer);
525    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
526
527    // setup a region/store with some files
528    int numStores = r.getStores().size();
529    CountDownLatch latch = new CountDownLatch(numStores);
530    Tracker tracker = new Tracker(latch);
531    // create some store files and setup requests for each store on which we want to do a
532    // compaction
533    for (HStore store : r.getStores()) {
534      createStoreFile(r, store.getColumnFamilyName());
535      createStoreFile(r, store.getColumnFamilyName());
536      createStoreFile(r, store.getColumnFamilyName());
537      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker,
538        null);
539    }
540    // wait for the latch to complete.
541    latch.await();
542
543    thread.interruptIfNecessary();
544  }
545
546  class StoreMockMaker extends StatefulStoreMockMaker {
547    public ArrayList<HStoreFile> compacting = new ArrayList<>();
548    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
549    private final ArrayList<Integer> results;
550
551    public StoreMockMaker(ArrayList<Integer> results) {
552      this.results = results;
553    }
554
555    public class TestCompactionContext extends CompactionContext {
556
557      private List<HStoreFile> selectedFiles;
558
559      public TestCompactionContext(List<HStoreFile> selectedFiles) {
560        super();
561        this.selectedFiles = selectedFiles;
562      }
563
564      @Override
565      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
566        return new ArrayList<>();
567      }
568
569      @Override
570      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
571        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
572        this.request = new CompactionRequestImpl(selectedFiles);
573        this.request.setPriority(getPriority());
574        return true;
575      }
576
577      @Override
578      public List<Path> compact(ThroughputController throughputController, User user)
579        throws IOException {
580        finishCompaction(this.selectedFiles);
581        return new ArrayList<>();
582      }
583    }
584
585    @Override
586    public synchronized Optional<CompactionContext> selectCompaction() {
587      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
588      compacting.addAll(notCompacting);
589      notCompacting.clear();
590      try {
591        ctx.select(null, false, false, false);
592      } catch (IOException ex) {
593        fail("Shouldn't happen");
594      }
595      return Optional.of(ctx);
596    }
597
598    @Override
599    public synchronized void cancelCompaction(Object object) {
600      TestCompactionContext ctx = (TestCompactionContext) object;
601      compacting.removeAll(ctx.selectedFiles);
602      notCompacting.addAll(ctx.selectedFiles);
603    }
604
605    public synchronized void finishCompaction(List<HStoreFile> sfs) {
606      if (sfs.isEmpty()) return;
607      synchronized (results) {
608        results.add(sfs.size());
609      }
610      compacting.removeAll(sfs);
611    }
612
613    @Override
614    public int getPriority() {
615      return 7 - compacting.size() - notCompacting.size();
616    }
617  }
618
619  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
620    BlockingCompactionContext blocked = null;
621
622    public class BlockingCompactionContext extends CompactionContext {
623      public volatile boolean isInCompact = false;
624
625      public void unblock() {
626        synchronized (this) {
627          this.notifyAll();
628        }
629      }
630
631      @Override
632      public List<Path> compact(ThroughputController throughputController, User user)
633        throws IOException {
634        try {
635          isInCompact = true;
636          synchronized (this) {
637            this.wait();
638          }
639        } catch (InterruptedException e) {
640          Assume.assumeNoException(e);
641        }
642        return new ArrayList<>();
643      }
644
645      @Override
646      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
647        return new ArrayList<>();
648      }
649
650      @Override
651      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
652        throws IOException {
653        this.request = new CompactionRequestImpl(new ArrayList<>());
654        return true;
655      }
656    }
657
658    @Override
659    public Optional<CompactionContext> selectCompaction() {
660      this.blocked = new BlockingCompactionContext();
661      try {
662        this.blocked.select(null, false, false, false);
663      } catch (IOException ex) {
664        fail("Shouldn't happen");
665      }
666      return Optional.of(blocked);
667    }
668
669    @Override
670    public void cancelCompaction(Object object) {
671    }
672
673    @Override
674    public int getPriority() {
675      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
676    }
677
678    public BlockingCompactionContext waitForBlocking() {
679      while (this.blocked == null || !this.blocked.isInCompact) {
680        Threads.sleepWithoutInterrupt(50);
681      }
682      BlockingCompactionContext ctx = this.blocked;
683      this.blocked = null;
684      return ctx;
685    }
686
687    @Override
688    public HStore createStoreMock(String name) throws Exception {
689      return createStoreMock(Integer.MIN_VALUE, name);
690    }
691
692    public HStore createStoreMock(int priority, String name) throws Exception {
693      // Override the mock to always return the specified priority.
694      HStore s = super.createStoreMock(name);
695      when(s.getCompactPriority()).thenReturn(priority);
696      return s;
697    }
698  }
699
700  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
701  @Test
702  public void testCompactionQueuePriorities() throws Exception {
703    // Setup a compact/split thread on a mock server.
704    final Configuration conf = HBaseConfiguration.create();
705    HRegionServer mockServer = mock(HRegionServer.class);
706    when(mockServer.isStopped()).thenReturn(false);
707    when(mockServer.getConfiguration()).thenReturn(conf);
708    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
709    CompactSplit cst = new CompactSplit(mockServer);
710    when(mockServer.getCompactSplitThread()).thenReturn(cst);
711    // prevent large compaction thread pool stealing job from small compaction queue.
712    cst.shutdownLongCompactions();
713    // Set up the region mock that redirects compactions.
714    HRegion r = mock(HRegion.class);
715    when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
716      @Override
717      public Boolean answer(InvocationOnMock invocation) throws Throwable {
718        invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null);
719        return true;
720      }
721    });
722
723    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
724    ArrayList<Integer> results = new ArrayList<>();
725    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
726    HStore store = sm.createStoreMock("store1");
727    HStore store2 = sm2.createStoreMock("store2");
728    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
729
730    // First, block the compaction thread so that we could muck with queue.
731    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
732    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
733
734    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
735    for (int i = 0; i < 4; ++i) {
736      sm.notCompacting.add(createFile());
737    }
738    cst.requestSystemCompaction(r, store, "s1-pri3");
739    for (int i = 0; i < 3; ++i) {
740      sm2.notCompacting.add(createFile());
741    }
742    cst.requestSystemCompaction(r, store2, "s2-pri4");
743    // Now add 2 more files to store1 and queue compaction - pri 1.
744    for (int i = 0; i < 2; ++i) {
745      sm.notCompacting.add(createFile());
746    }
747    cst.requestSystemCompaction(r, store, "s1-pri1");
748    // Finally add blocking compaction with priority 2.
749    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
750
751    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
752    currentBlock.unblock();
753    currentBlock = blocker.waitForBlocking();
754    // Pri1 should have "compacted" all 6 files.
755    assertEquals(1, results.size());
756    assertEquals(6, results.get(0).intValue());
757    // Add 2 files to store 1 (it has 2 files now).
758    for (int i = 0; i < 2; ++i) {
759      sm.notCompacting.add(createFile());
760    }
761    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
762    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
763    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
764    currentBlock.unblock();
765    currentBlock = blocker.waitForBlocking();
766    assertEquals(3, results.size());
767    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
768    assertEquals(2, results.get(2).intValue());
769
770    currentBlock.unblock();
771    cst.interruptIfNecessary();
772  }
773
774  /**
775   * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then
776   * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The
777   * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time
778   * stamp but different sequence id, and will get scanned successively during compaction.
779   * <p/>
780   * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set
781   * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out
782   * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause
783   * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test
784   */
785  @Test
786  public void testCompactionSeqId() throws Exception {
787    final byte[] ROW = Bytes.toBytes("row");
788    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
789
790    long timestamp = 10000;
791
792    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
793    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
794    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
795    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
796    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
797    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
798    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
799    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
800    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
801    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
802    for (int i = 0; i < 10; i++) {
803      Put put = new Put(ROW);
804      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
805      r.put(put);
806    }
807    r.flush(true);
808
809    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
810    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
811    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
812    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
813    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
814    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
815    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
816    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
817    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
818    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
819    for (int i = 18; i > 8; i--) {
820      Put put = new Put(ROW);
821      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
822      r.put(put);
823    }
824    r.flush(true);
825    r.compact(true);
826  }
827
828  public static class DummyCompactor extends DefaultCompactor {
829    public DummyCompactor(Configuration conf, HStore store) {
830      super(conf, store);
831      this.keepSeqIdPeriod = 0;
832    }
833  }
834
835  private static HStoreFile createFile() throws Exception {
836    HStoreFile sf = mock(HStoreFile.class);
837    when(sf.getPath()).thenReturn(new Path("file"));
838    StoreFileReader r = mock(StoreFileReader.class);
839    when(r.length()).thenReturn(10L);
840    when(sf.getReader()).thenReturn(r);
841    return sf;
842  }
843
844  /**
845   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
846   * finishes.
847   */
848  public static class Tracker implements CompactionLifeCycleTracker {
849
850    private final CountDownLatch done;
851
852    public Tracker(CountDownLatch done) {
853      this.done = done;
854    }
855
856    @Override
857    public void afterExecution(Store store) {
858      done.countDown();
859    }
860  }
861
862  /**
863   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
864   * finishes.
865   */
866  public static class WaitThroughPutController extends NoLimitThroughputController {
867
868    public WaitThroughPutController() {
869    }
870
871    @Override
872    public long control(String compactionName, long size) throws InterruptedException {
873      Thread.sleep(6000000);
874      return 6000000;
875    }
876  }
877}