001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
026import static org.hamcrest.MatcherAssert.assertThat;
027import static org.hamcrest.Matchers.allOf;
028import static org.hamcrest.Matchers.containsString;
029import static org.hamcrest.Matchers.hasProperty;
030import static org.hamcrest.Matchers.instanceOf;
031import static org.hamcrest.Matchers.notNullValue;
032import static org.junit.Assert.assertEquals;
033import static org.junit.Assert.assertFalse;
034import static org.junit.Assert.assertThrows;
035import static org.junit.Assert.assertTrue;
036import static org.junit.Assert.fail;
037import static org.mockito.ArgumentMatchers.any;
038import static org.mockito.Mockito.doAnswer;
039import static org.mockito.Mockito.mock;
040import static org.mockito.Mockito.spy;
041import static org.mockito.Mockito.when;
042
043import java.io.IOException;
044import java.io.InputStream;
045import java.io.OutputStream;
046import java.util.ArrayList;
047import java.util.Collection;
048import java.util.Collections;
049import java.util.List;
050import java.util.Objects;
051import java.util.Optional;
052import java.util.concurrent.CountDownLatch;
053import java.util.concurrent.TimeUnit;
054import java.util.zip.GZIPInputStream;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FSDataOutputStream;
057import org.apache.hadoop.fs.FileStatus;
058import org.apache.hadoop.fs.FileSystem;
059import org.apache.hadoop.fs.Path;
060import org.apache.hadoop.hbase.ChoreService;
061import org.apache.hadoop.hbase.HBaseClassTestRule;
062import org.apache.hadoop.hbase.HBaseConfiguration;
063import org.apache.hadoop.hbase.HBaseTestingUtil;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.HTestConst;
066import org.apache.hadoop.hbase.KeyValue;
067import org.apache.hadoop.hbase.Waiter;
068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
070import org.apache.hadoop.hbase.client.Delete;
071import org.apache.hadoop.hbase.client.Durability;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.Table;
074import org.apache.hadoop.hbase.client.TableDescriptor;
075import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
076import org.apache.hadoop.hbase.io.compress.Compression;
077import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
078import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
079import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
080import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
081import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
082import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
083import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
084import org.apache.hadoop.hbase.security.User;
085import org.apache.hadoop.hbase.testclassification.MediumTests;
086import org.apache.hadoop.hbase.testclassification.RegionServerTests;
087import org.apache.hadoop.hbase.util.Bytes;
088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
089import org.apache.hadoop.hbase.util.Threads;
090import org.apache.hadoop.hbase.wal.WAL;
091import org.apache.hadoop.io.IOUtils;
092import org.junit.After;
093import org.junit.Assume;
094import org.junit.Before;
095import org.junit.ClassRule;
096import org.junit.Rule;
097import org.junit.Test;
098import org.junit.experimental.categories.Category;
099import org.junit.rules.TestName;
100import org.mockito.Mockito;
101import org.mockito.invocation.InvocationOnMock;
102import org.mockito.stubbing.Answer;
103import org.slf4j.LoggerFactory;
104
105/**
106 * Test compaction framework and common functions
107 */
108@Category({ RegionServerTests.class, MediumTests.class })
109public class TestCompaction {
110
111  @ClassRule
112  public static final HBaseClassTestRule CLASS_RULE =
113    HBaseClassTestRule.forClass(TestCompaction.class);
114
115  @Rule
116  public TestName name = new TestName();
117  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
118  protected Configuration conf = UTIL.getConfiguration();
119
120  private HRegion r = null;
121  private TableDescriptor tableDescriptor = null;
122  private static final byte[] COLUMN_FAMILY = fam1;
123  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
124  private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
125  private int compactionThreshold;
126  private byte[] secondRowBytes, thirdRowBytes;
127  private static final long MAX_FILES_TO_COMPACT = 10;
128  private final byte[] FAMILY = Bytes.toBytes("cf");
129
130  /** constructor */
131  public TestCompaction() {
132    // Set cache flush size to 1MB
133    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
134    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
135    conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L);
136    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
137      NoLimitThroughputController.class.getName());
138    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
139
140    secondRowBytes = START_KEY_BYTES.clone();
141    // Increment the least significant character so we get to next row.
142    secondRowBytes[START_KEY_BYTES.length - 1]++;
143    thirdRowBytes = START_KEY_BYTES.clone();
144    thirdRowBytes[START_KEY_BYTES.length - 1] =
145      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
146  }
147
148  @Before
149  public void setUp() throws Exception {
150    TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name.getMethodName());
151    if (name.getMethodName().equals("testCompactionSeqId")) {
152      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
153      UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
154        DummyCompactor.class.getName());
155      ColumnFamilyDescriptor familyDescriptor =
156        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build();
157      builder.setColumnFamily(familyDescriptor);
158    }
159    if (name.getMethodName().equals("testCompactionWithCorruptBlock")) {
160      UTIL.getConfiguration().setBoolean("hbase.hstore.validate.read_fully", true);
161      ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY)
162        .setCompressionType(Compression.Algorithm.GZ).build();
163      builder.setColumnFamily(familyDescriptor);
164    }
165    this.tableDescriptor = builder.build();
166    this.r = UTIL.createLocalHRegion(tableDescriptor, null, null);
167  }
168
169  @After
170  public void tearDown() throws Exception {
171    WAL wal = r.getWAL();
172    this.r.close();
173    wal.close();
174  }
175
176  /**
177   * Verify that you can stop a long-running compaction (used during RS shutdown)
178   */
179  @Test
180  public void testInterruptCompactionBySize() throws Exception {
181    assertEquals(0, count());
182
183    // lower the polling interval for this test
184    conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
185
186    try {
187      // Create a couple store files w/ 15KB (over 10KB interval)
188      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
189      byte[] pad = new byte[1000]; // 1 KB chunk
190      for (int i = 0; i < compactionThreshold; i++) {
191        Table loader = new RegionAsTable(r);
192        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
193        p.setDurability(Durability.SKIP_WAL);
194        for (int j = 0; j < jmax; j++) {
195          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
196        }
197        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
198        loader.put(p);
199        r.flush(true);
200      }
201
202      HRegion spyR = spy(r);
203      doAnswer(new Answer<Object>() {
204        @Override
205        public Object answer(InvocationOnMock invocation) throws Throwable {
206          r.writestate.writesEnabled = false;
207          return invocation.callRealMethod();
208        }
209      }).when(spyR).doRegionCompactionPrep();
210
211      // force a minor compaction, but not before requesting a stop
212      spyR.compactStores();
213
214      // ensure that the compaction stopped, all old files are intact,
215      HStore s = r.getStore(COLUMN_FAMILY);
216      assertEquals(compactionThreshold, s.getStorefilesCount());
217      assertTrue(s.getStorefilesSize() > 15 * 1000);
218      // and no new store files persisted past compactStores()
219      // only one empty dir exists in temp dir
220      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
221      assertEquals(1, ls.length);
222      Path storeTempDir =
223        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
224      assertTrue(r.getFilesystem().exists(storeTempDir));
225      ls = r.getFilesystem().listStatus(storeTempDir);
226      assertEquals(0, ls.length);
227    } finally {
228      // don't mess up future tests
229      r.writestate.writesEnabled = true;
230      conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
231
232      // Delete all Store information once done using
233      for (int i = 0; i < compactionThreshold; i++) {
234        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
235        byte[][] famAndQf = { COLUMN_FAMILY, null };
236        delete.addFamily(famAndQf[0]);
237        r.delete(delete);
238      }
239      r.flush(true);
240
241      // Multiple versions allowed for an entry, so the delete isn't enough
242      // Lower TTL and expire to ensure that all our entries have been wiped
243      final int ttl = 1000;
244      for (HStore store : this.r.stores.values()) {
245        ScanInfo old = store.getScanInfo();
246        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
247        store.setScanInfo(si);
248      }
249      Thread.sleep(ttl);
250
251      r.compact(true);
252      assertEquals(0, count());
253    }
254  }
255
256  @Test
257  public void testInterruptCompactionByTime() throws Exception {
258    assertEquals(0, count());
259
260    // lower the polling interval for this test
261    conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
262
263    try {
264      // Create a couple store files w/ 15KB (over 10KB interval)
265      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
266      byte[] pad = new byte[1000]; // 1 KB chunk
267      for (int i = 0; i < compactionThreshold; i++) {
268        Table loader = new RegionAsTable(r);
269        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
270        p.setDurability(Durability.SKIP_WAL);
271        for (int j = 0; j < jmax; j++) {
272          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
273        }
274        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
275        loader.put(p);
276        r.flush(true);
277      }
278
279      HRegion spyR = spy(r);
280      doAnswer(new Answer<Object>() {
281        @Override
282        public Object answer(InvocationOnMock invocation) throws Throwable {
283          r.writestate.writesEnabled = false;
284          return invocation.callRealMethod();
285        }
286      }).when(spyR).doRegionCompactionPrep();
287
288      // force a minor compaction, but not before requesting a stop
289      spyR.compactStores();
290
291      // ensure that the compaction stopped, all old files are intact,
292      HStore s = r.getStore(COLUMN_FAMILY);
293      assertEquals(compactionThreshold, s.getStorefilesCount());
294      assertTrue(s.getStorefilesSize() > 15 * 1000);
295      // and no new store files persisted past compactStores()
296      // only one empty dir exists in temp dir
297      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
298      assertEquals(1, ls.length);
299      Path storeTempDir =
300        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
301      assertTrue(r.getFilesystem().exists(storeTempDir));
302      ls = r.getFilesystem().listStatus(storeTempDir);
303      assertEquals(0, ls.length);
304    } finally {
305      // don't mess up future tests
306      r.writestate.writesEnabled = true;
307      conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
308
309      // Delete all Store information once done using
310      for (int i = 0; i < compactionThreshold; i++) {
311        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
312        byte[][] famAndQf = { COLUMN_FAMILY, null };
313        delete.addFamily(famAndQf[0]);
314        r.delete(delete);
315      }
316      r.flush(true);
317
318      // Multiple versions allowed for an entry, so the delete isn't enough
319      // Lower TTL and expire to ensure that all our entries have been wiped
320      final int ttl = 1000;
321      for (HStore store : this.r.stores.values()) {
322        ScanInfo old = store.getScanInfo();
323        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
324        store.setScanInfo(si);
325      }
326      Thread.sleep(ttl);
327
328      r.compact(true);
329      assertEquals(0, count());
330    }
331  }
332
333  private int count() throws IOException {
334    int count = 0;
335    for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
336      f.initReader();
337      try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) {
338        scanner.seek(KeyValue.LOWESTKEY);
339        while (scanner.next() != null) {
340          count++;
341        }
342      }
343    }
344    return count;
345  }
346
347  private void createStoreFile(final HRegion region) throws IOException {
348    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
349  }
350
351  private void createStoreFile(final HRegion region, String family) throws IOException {
352    Table loader = new RegionAsTable(region);
353    HTestConst.addContent(loader, family);
354    region.flush(true);
355  }
356
357  @Test
358  public void testCompactionWithCorruptResult() throws Exception {
359    int nfiles = 10;
360    for (int i = 0; i < nfiles; i++) {
361      createStoreFile(r);
362    }
363    HStore store = r.getStore(COLUMN_FAMILY);
364
365    Collection<HStoreFile> storeFiles = store.getStorefiles();
366    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
367    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
368    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
369
370    // Now lets corrupt the compacted file.
371    FileSystem fs = store.getFileSystem();
372    // default compaction policy created one and only one new compacted file
373    Path tmpPath = store.getRegionFileSystem().createTempName();
374    try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
375      stream.writeChars("CORRUPT FILE!!!!");
376    }
377
378    // The complete compaction should fail and the corrupt file should remain
379    // in the 'tmp' directory;
380    assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
381      EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
382    assertTrue(fs.exists(tmpPath));
383  }
384
385  /**
386   * This test uses a hand-modified HFile, which is loaded in from the resources' path. That file
387   * was generated from the test support code in this class and then edited to corrupt the
388   * GZ-encoded block by zeroing-out the first two bytes of the GZip header, the "standard
389   * declaration" of {@code 1f 8b}, found at offset 33 in the file. I'm not sure why, but it seems
390   * that in this test context we do not enforce CRC checksums. Thus, this corruption manifests in
391   * the Decompressor rather than in the reader when it loads the block bytes and compares vs. the
392   * header.
393   */
394  @Test
395  public void testCompactionWithCorruptBlock() throws Exception {
396    createStoreFile(r, Bytes.toString(FAMILY));
397    createStoreFile(r, Bytes.toString(FAMILY));
398    HStore store = r.getStore(FAMILY);
399
400    Collection<HStoreFile> storeFiles = store.getStorefiles();
401    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
402    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
403    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
404
405    // insert the hfile with a corrupted data block into the region's tmp directory, where
406    // compaction output is collected.
407    FileSystem fs = store.getFileSystem();
408    Path tmpPath = store.getRegionFileSystem().createTempName();
409    try (
410      InputStream inputStream =
411        getClass().getResourceAsStream("TestCompaction_HFileWithCorruptBlock.gz");
412      GZIPInputStream gzipInputStream = new GZIPInputStream(Objects.requireNonNull(inputStream));
413      OutputStream outputStream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
414      assertThat(gzipInputStream, notNullValue());
415      assertThat(outputStream, notNullValue());
416      IOUtils.copyBytes(gzipInputStream, outputStream, 512);
417    }
418    LoggerFactory.getLogger(TestCompaction.class).info("Wrote corrupted HFile to {}", tmpPath);
419
420    // The complete compaction should fail and the corrupt file should remain
421    // in the 'tmp' directory;
422    try {
423      store.doCompaction(request, storeFiles, null, EnvironmentEdgeManager.currentTime(),
424        Collections.singletonList(tmpPath));
425    } catch (IOException e) {
426      Throwable rootCause = e;
427      while (rootCause.getCause() != null) {
428        rootCause = rootCause.getCause();
429      }
430      assertThat(rootCause, allOf(instanceOf(IOException.class),
431        hasProperty("message", containsString("not a gzip file"))));
432      assertTrue(fs.exists(tmpPath));
433      return;
434    }
435    fail("Compaction should have failed due to corrupt block");
436  }
437
438  /**
439   * Create a custom compaction request and be sure that we can track it through the queue, knowing
440   * when the compaction is completed.
441   */
442  @Test
443  public void testTrackingCompactionRequest() throws Exception {
444    // setup a compact/split thread on a mock server
445    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
446    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
447    CompactSplit thread = new CompactSplit(mockServer);
448    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
449
450    // setup a region/store with some files
451    HStore store = r.getStore(COLUMN_FAMILY);
452    createStoreFile(r);
453    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
454      createStoreFile(r);
455    }
456
457    CountDownLatch latch = new CountDownLatch(1);
458    Tracker tracker = new Tracker(latch);
459    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null);
460    // wait for the latch to complete.
461    latch.await();
462
463    thread.interruptIfNecessary();
464  }
465
466  @Test
467  public void testCompactionFailure() throws Exception {
468    // setup a compact/split thread on a mock server
469    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
470    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
471    CompactSplit thread = new CompactSplit(mockServer);
472    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
473
474    // setup a region/store with some files
475    HStore store = r.getStore(COLUMN_FAMILY);
476    createStoreFile(r);
477    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
478      createStoreFile(r);
479    }
480
481    HRegion mockRegion = Mockito.spy(r);
482    Mockito.when(mockRegion.checkSplit())
483      .thenThrow(new RuntimeException("Thrown intentionally by test!"));
484
485    try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {
486
487      long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
488      long preFailedCount = metricsWrapper.getNumCompactionsFailed();
489
490      CountDownLatch latch = new CountDownLatch(1);
491      Tracker tracker = new Tracker(latch);
492      thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
493        null);
494      // wait for the latch to complete.
495      latch.await(120, TimeUnit.SECONDS);
496
497      // compaction should have completed and been marked as failed due to error in split request
498      long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
499      long postFailedCount = metricsWrapper.getNumCompactionsFailed();
500
501      assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post="
502        + postCompletedCount + ")", postCompletedCount > preCompletedCount);
503      assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post="
504        + postFailedCount + ")", postFailedCount > preFailedCount);
505    }
506  }
507
508  /**
509   * Test no new Compaction requests are generated after calling stop compactions
510   */
511  @Test
512  public void testStopStartCompaction() throws IOException {
513    // setup a compact/split thread on a mock server
514    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
515    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
516    final CompactSplit thread = new CompactSplit(mockServer);
517    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
518    // setup a region/store with some files
519    HStore store = r.getStore(COLUMN_FAMILY);
520    createStoreFile(r);
521    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
522      createStoreFile(r);
523    }
524    thread.switchCompaction(false);
525    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
526      CompactionLifeCycleTracker.DUMMY, null);
527    assertFalse(thread.isCompactionsEnabled());
528    int longCompactions = thread.getLongCompactions().getActiveCount();
529    int shortCompactions = thread.getShortCompactions().getActiveCount();
530    assertEquals(
531      "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0,
532      longCompactions + shortCompactions);
533    thread.switchCompaction(true);
534    assertTrue(thread.isCompactionsEnabled());
535    // Make sure no compactions have run.
536    assertEquals(0, thread.getLongCompactions().getCompletedTaskCount()
537      + thread.getShortCompactions().getCompletedTaskCount());
538    // Request a compaction and make sure it is submitted successfully.
539    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
540      CompactionLifeCycleTracker.DUMMY, null);
541    // Wait until the compaction finishes.
542    Waiter.waitFor(UTIL.getConfiguration(), 5000,
543      (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount()
544        + thread.getShortCompactions().getCompletedTaskCount() == 1);
545    // Make sure there are no compactions running.
546    assertEquals(0,
547      thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount());
548  }
549
550  @Test
551  public void testInterruptingRunningCompactions() throws Exception {
552    // setup a compact/split thread on a mock server
553    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
554      WaitThroughPutController.class.getName());
555    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
556    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
557    CompactSplit thread = new CompactSplit(mockServer);
558
559    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
560
561    // setup a region/store with some files
562    HStore store = r.getStore(COLUMN_FAMILY);
563    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
564    byte[] pad = new byte[1000]; // 1 KB chunk
565    for (int i = 0; i < compactionThreshold; i++) {
566      Table loader = new RegionAsTable(r);
567      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
568      p.setDurability(Durability.SKIP_WAL);
569      for (int j = 0; j < jmax; j++) {
570        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
571      }
572      HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
573      loader.put(p);
574      r.flush(true);
575    }
576    HStore s = r.getStore(COLUMN_FAMILY);
577    int initialFiles = s.getStorefilesCount();
578
579    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
580      CompactionLifeCycleTracker.DUMMY, null);
581
582    Thread.sleep(3000);
583    thread.switchCompaction(false);
584    assertEquals(initialFiles, s.getStorefilesCount());
585    // don't mess up future tests
586    thread.switchCompaction(true);
587  }
588
589  /**
590   * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit}
591   * @throws Exception on failure
592   */
593  @Test
594  public void testMultipleCustomCompactionRequests() throws Exception {
595    // setup a compact/split thread on a mock server
596    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
597    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
598    CompactSplit thread = new CompactSplit(mockServer);
599    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
600
601    // setup a region/store with some files
602    int numStores = r.getStores().size();
603    CountDownLatch latch = new CountDownLatch(numStores);
604    Tracker tracker = new Tracker(latch);
605    // create some store files and setup requests for each store on which we want to do a
606    // compaction
607    for (HStore store : r.getStores()) {
608      createStoreFile(r, store.getColumnFamilyName());
609      createStoreFile(r, store.getColumnFamilyName());
610      createStoreFile(r, store.getColumnFamilyName());
611      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker,
612        null);
613    }
614    // wait for the latch to complete.
615    latch.await();
616
617    thread.interruptIfNecessary();
618  }
619
620  class StoreMockMaker extends StatefulStoreMockMaker {
621    public ArrayList<HStoreFile> compacting = new ArrayList<>();
622    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
623    private final ArrayList<Integer> results;
624
625    public StoreMockMaker(ArrayList<Integer> results) {
626      this.results = results;
627    }
628
629    public class TestCompactionContext extends CompactionContext {
630
631      private List<HStoreFile> selectedFiles;
632
633      public TestCompactionContext(List<HStoreFile> selectedFiles) {
634        super();
635        this.selectedFiles = selectedFiles;
636      }
637
638      @Override
639      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
640        return new ArrayList<>();
641      }
642
643      @Override
644      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
645        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
646        this.request = new CompactionRequestImpl(selectedFiles);
647        this.request.setPriority(getPriority());
648        return true;
649      }
650
651      @Override
652      public List<Path> compact(ThroughputController throughputController, User user)
653        throws IOException {
654        finishCompaction(this.selectedFiles);
655        return new ArrayList<>();
656      }
657    }
658
659    @Override
660    public synchronized Optional<CompactionContext> selectCompaction() {
661      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
662      compacting.addAll(notCompacting);
663      notCompacting.clear();
664      try {
665        ctx.select(null, false, false, false);
666      } catch (IOException ex) {
667        fail("Shouldn't happen");
668      }
669      return Optional.of(ctx);
670    }
671
672    @Override
673    public synchronized void cancelCompaction(Object object) {
674      TestCompactionContext ctx = (TestCompactionContext) object;
675      compacting.removeAll(ctx.selectedFiles);
676      notCompacting.addAll(ctx.selectedFiles);
677    }
678
679    public synchronized void finishCompaction(List<HStoreFile> sfs) {
680      if (sfs.isEmpty()) return;
681      synchronized (results) {
682        results.add(sfs.size());
683      }
684      compacting.removeAll(sfs);
685    }
686
687    @Override
688    public int getPriority() {
689      return 7 - compacting.size() - notCompacting.size();
690    }
691  }
692
693  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
694    BlockingCompactionContext blocked = null;
695
696    public class BlockingCompactionContext extends CompactionContext {
697      public volatile boolean isInCompact = false;
698
699      public void unblock() {
700        synchronized (this) {
701          this.notifyAll();
702        }
703      }
704
705      @Override
706      public List<Path> compact(ThroughputController throughputController, User user)
707        throws IOException {
708        try {
709          isInCompact = true;
710          synchronized (this) {
711            this.wait();
712          }
713        } catch (InterruptedException e) {
714          Assume.assumeNoException(e);
715        }
716        return new ArrayList<>();
717      }
718
719      @Override
720      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
721        return new ArrayList<>();
722      }
723
724      @Override
725      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
726        throws IOException {
727        this.request = new CompactionRequestImpl(new ArrayList<>());
728        return true;
729      }
730    }
731
732    @Override
733    public Optional<CompactionContext> selectCompaction() {
734      this.blocked = new BlockingCompactionContext();
735      try {
736        this.blocked.select(null, false, false, false);
737      } catch (IOException ex) {
738        fail("Shouldn't happen");
739      }
740      return Optional.of(blocked);
741    }
742
743    @Override
744    public void cancelCompaction(Object object) {
745    }
746
747    @Override
748    public int getPriority() {
749      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
750    }
751
752    public BlockingCompactionContext waitForBlocking() {
753      while (this.blocked == null || !this.blocked.isInCompact) {
754        Threads.sleepWithoutInterrupt(50);
755      }
756      BlockingCompactionContext ctx = this.blocked;
757      this.blocked = null;
758      return ctx;
759    }
760
761    @Override
762    public HStore createStoreMock(String name) throws Exception {
763      return createStoreMock(Integer.MIN_VALUE, name);
764    }
765
766    public HStore createStoreMock(int priority, String name) throws Exception {
767      // Override the mock to always return the specified priority.
768      HStore s = super.createStoreMock(name);
769      when(s.getCompactPriority()).thenReturn(priority);
770      return s;
771    }
772  }
773
774  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
775  @Test
776  public void testCompactionQueuePriorities() throws Exception {
777    // Setup a compact/split thread on a mock server.
778    final Configuration conf = HBaseConfiguration.create();
779    HRegionServer mockServer = mock(HRegionServer.class);
780    when(mockServer.isStopped()).thenReturn(false);
781    when(mockServer.getConfiguration()).thenReturn(conf);
782    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
783    CompactSplit cst = new CompactSplit(mockServer);
784    when(mockServer.getCompactSplitThread()).thenReturn(cst);
785    // prevent large compaction thread pool stealing job from small compaction queue.
786    cst.shutdownLongCompactions();
787    // Set up the region mock that redirects compactions.
788    HRegion r = mock(HRegion.class);
789    when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
790      @Override
791      public Boolean answer(InvocationOnMock invocation) throws Throwable {
792        invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null);
793        return true;
794      }
795    });
796
797    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
798    ArrayList<Integer> results = new ArrayList<>();
799    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
800    HStore store = sm.createStoreMock("store1");
801    HStore store2 = sm2.createStoreMock("store2");
802    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
803
804    // First, block the compaction thread so that we could muck with queue.
805    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
806    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
807
808    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
809    for (int i = 0; i < 4; ++i) {
810      sm.notCompacting.add(createFile());
811    }
812    cst.requestSystemCompaction(r, store, "s1-pri3");
813    for (int i = 0; i < 3; ++i) {
814      sm2.notCompacting.add(createFile());
815    }
816    cst.requestSystemCompaction(r, store2, "s2-pri4");
817    // Now add 2 more files to store1 and queue compaction - pri 1.
818    for (int i = 0; i < 2; ++i) {
819      sm.notCompacting.add(createFile());
820    }
821    cst.requestSystemCompaction(r, store, "s1-pri1");
822    // Finally add blocking compaction with priority 2.
823    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
824
825    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
826    currentBlock.unblock();
827    currentBlock = blocker.waitForBlocking();
828    // Pri1 should have "compacted" all 6 files.
829    assertEquals(1, results.size());
830    assertEquals(6, results.get(0).intValue());
831    // Add 2 files to store 1 (it has 2 files now).
832    for (int i = 0; i < 2; ++i) {
833      sm.notCompacting.add(createFile());
834    }
835    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
836    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
837    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
838    currentBlock.unblock();
839    currentBlock = blocker.waitForBlocking();
840    assertEquals(3, results.size());
841    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
842    assertEquals(2, results.get(2).intValue());
843
844    currentBlock.unblock();
845    cst.interruptIfNecessary();
846  }
847
848  /**
849   * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then
850   * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The
851   * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time
852   * stamp but different sequence id, and will get scanned successively during compaction.
853   * <p/>
854   * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set
855   * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out
856   * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause
857   * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test
858   */
859  @Test
860  public void testCompactionSeqId() throws Exception {
861    final byte[] ROW = Bytes.toBytes("row");
862    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
863
864    long timestamp = 10000;
865
866    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
867    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
868    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
869    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
870    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
871    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
872    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
873    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
874    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
875    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
876    for (int i = 0; i < 10; i++) {
877      Put put = new Put(ROW);
878      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
879      r.put(put);
880    }
881    r.flush(true);
882
883    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
884    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
885    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
886    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
887    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
888    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
889    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
890    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
891    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
892    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
893    for (int i = 18; i > 8; i--) {
894      Put put = new Put(ROW);
895      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
896      r.put(put);
897    }
898    r.flush(true);
899    r.compact(true);
900  }
901
902  public static class DummyCompactor extends DefaultCompactor {
903    public DummyCompactor(Configuration conf, HStore store) {
904      super(conf, store);
905      this.keepSeqIdPeriod = 0;
906    }
907  }
908
909  private static HStoreFile createFile() throws Exception {
910    HStoreFile sf = mock(HStoreFile.class);
911    when(sf.getPath()).thenReturn(new Path("file"));
912    StoreFileReader r = mock(StoreFileReader.class);
913    when(r.length()).thenReturn(10L);
914    when(sf.getReader()).thenReturn(r);
915    return sf;
916  }
917
918  /**
919   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
920   * finishes.
921   */
922  public static class Tracker implements CompactionLifeCycleTracker {
923
924    private final CountDownLatch done;
925
926    public Tracker(CountDownLatch done) {
927      this.done = done;
928    }
929
930    @Override
931    public void afterExecution(Store store) {
932      done.countDown();
933    }
934  }
935
936  /**
937   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
938   * finishes.
939   */
940  public static class WaitThroughPutController extends NoLimitThroughputController {
941
942    public WaitThroughPutController() {
943    }
944
945    @Override
946    public long control(String compactionName, long size) throws InterruptedException {
947      Thread.sleep(6000000);
948      return 6000000;
949    }
950  }
951}