001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES;
022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
023import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
024import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
025import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
026import static org.junit.Assert.assertEquals;
027import static org.junit.Assert.assertFalse;
028import static org.junit.Assert.assertThrows;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.Mockito.doAnswer;
033import static org.mockito.Mockito.mock;
034import static org.mockito.Mockito.spy;
035import static org.mockito.Mockito.when;
036
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.Collection;
040import java.util.Collections;
041import java.util.List;
042import java.util.Optional;
043import java.util.concurrent.CountDownLatch;
044import java.util.concurrent.TimeUnit;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FSDataOutputStream;
047import org.apache.hadoop.fs.FileStatus;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.ChoreService;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtil;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.HTestConst;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.Waiter;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Delete;
061import org.apache.hadoop.hbase.client.Durability;
062import org.apache.hadoop.hbase.client.Put;
063import org.apache.hadoop.hbase.client.Table;
064import org.apache.hadoop.hbase.client.TableDescriptor;
065import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
066import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
067import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
068import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
069import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
070import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
071import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
072import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
073import org.apache.hadoop.hbase.security.User;
074import org.apache.hadoop.hbase.testclassification.MediumTests;
075import org.apache.hadoop.hbase.testclassification.RegionServerTests;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
078import org.apache.hadoop.hbase.util.Threads;
079import org.apache.hadoop.hbase.wal.WAL;
080import org.junit.After;
081import org.junit.Assume;
082import org.junit.Before;
083import org.junit.ClassRule;
084import org.junit.Rule;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.junit.rules.TestName;
088import org.mockito.Mockito;
089import org.mockito.invocation.InvocationOnMock;
090import org.mockito.stubbing.Answer;
091
092/**
093 * Test compaction framework and common functions
094 */
095@Category({ RegionServerTests.class, MediumTests.class })
096public class TestCompaction {
097
098  @ClassRule
099  public static final HBaseClassTestRule CLASS_RULE =
100    HBaseClassTestRule.forClass(TestCompaction.class);
101
102  @Rule
103  public TestName name = new TestName();
104  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
105  protected Configuration conf = UTIL.getConfiguration();
106
107  private HRegion r = null;
108  private TableDescriptor tableDescriptor = null;
109  private static final byte[] COLUMN_FAMILY = fam1;
110  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
111  private static final byte[] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
112  private int compactionThreshold;
113  private byte[] secondRowBytes, thirdRowBytes;
114  private static final long MAX_FILES_TO_COMPACT = 10;
115  private final byte[] FAMILY = Bytes.toBytes("cf");
116
117  /** constructor */
118  public TestCompaction() {
119    super();
120
121    // Set cache flush size to 1MB
122    conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
123    conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
124    conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L);
125    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
126      NoLimitThroughputController.class.getName());
127    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
128
129    secondRowBytes = START_KEY_BYTES.clone();
130    // Increment the least significant character so we get to next row.
131    secondRowBytes[START_KEY_BYTES.length - 1]++;
132    thirdRowBytes = START_KEY_BYTES.clone();
133    thirdRowBytes[START_KEY_BYTES.length - 1] =
134      (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
135  }
136
137  @Before
138  public void setUp() throws Exception {
139    TableDescriptorBuilder builder = UTIL.createModifyableTableDescriptor(name.getMethodName());
140    if (name.getMethodName().equals("testCompactionSeqId")) {
141      UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
142      UTIL.getConfiguration().set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
143        DummyCompactor.class.getName());
144      ColumnFamilyDescriptor familyDescriptor =
145        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(65536).build();
146      builder.setColumnFamily(familyDescriptor);
147    }
148    this.tableDescriptor = builder.build();
149    this.r = UTIL.createLocalHRegion(tableDescriptor, null, null);
150  }
151
152  @After
153  public void tearDown() throws Exception {
154    WAL wal = r.getWAL();
155    this.r.close();
156    wal.close();
157  }
158
159  /**
160   * Verify that you can stop a long-running compaction (used during RS shutdown)
161   */
162  @Test
163  public void testInterruptCompactionBySize() throws Exception {
164    assertEquals(0, count());
165
166    // lower the polling interval for this test
167    conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
168
169    try {
170      // Create a couple store files w/ 15KB (over 10KB interval)
171      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
172      byte[] pad = new byte[1000]; // 1 KB chunk
173      for (int i = 0; i < compactionThreshold; i++) {
174        Table loader = new RegionAsTable(r);
175        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
176        p.setDurability(Durability.SKIP_WAL);
177        for (int j = 0; j < jmax; j++) {
178          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
179        }
180        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
181        loader.put(p);
182        r.flush(true);
183      }
184
185      HRegion spyR = spy(r);
186      doAnswer(new Answer<Object>() {
187        @Override
188        public Object answer(InvocationOnMock invocation) throws Throwable {
189          r.writestate.writesEnabled = false;
190          return invocation.callRealMethod();
191        }
192      }).when(spyR).doRegionCompactionPrep();
193
194      // force a minor compaction, but not before requesting a stop
195      spyR.compactStores();
196
197      // ensure that the compaction stopped, all old files are intact,
198      HStore s = r.getStore(COLUMN_FAMILY);
199      assertEquals(compactionThreshold, s.getStorefilesCount());
200      assertTrue(s.getStorefilesSize() > 15 * 1000);
201      // and no new store files persisted past compactStores()
202      // only one empty dir exists in temp dir
203      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
204      assertEquals(1, ls.length);
205      Path storeTempDir =
206        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
207      assertTrue(r.getFilesystem().exists(storeTempDir));
208      ls = r.getFilesystem().listStatus(storeTempDir);
209      assertEquals(0, ls.length);
210    } finally {
211      // don't mess up future tests
212      r.writestate.writesEnabled = true;
213      conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
214
215      // Delete all Store information once done using
216      for (int i = 0; i < compactionThreshold; i++) {
217        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
218        byte[][] famAndQf = { COLUMN_FAMILY, null };
219        delete.addFamily(famAndQf[0]);
220        r.delete(delete);
221      }
222      r.flush(true);
223
224      // Multiple versions allowed for an entry, so the delete isn't enough
225      // Lower TTL and expire to ensure that all our entries have been wiped
226      final int ttl = 1000;
227      for (HStore store : this.r.stores.values()) {
228        ScanInfo old = store.getScanInfo();
229        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
230        store.setScanInfo(si);
231      }
232      Thread.sleep(ttl);
233
234      r.compact(true);
235      assertEquals(0, count());
236    }
237  }
238
239  @Test
240  public void testInterruptCompactionByTime() throws Exception {
241    assertEquals(0, count());
242
243    // lower the polling interval for this test
244    conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
245
246    try {
247      // Create a couple store files w/ 15KB (over 10KB interval)
248      int jmax = (int) Math.ceil(15.0 / compactionThreshold);
249      byte[] pad = new byte[1000]; // 1 KB chunk
250      for (int i = 0; i < compactionThreshold; i++) {
251        Table loader = new RegionAsTable(r);
252        Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
253        p.setDurability(Durability.SKIP_WAL);
254        for (int j = 0; j < jmax; j++) {
255          p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
256        }
257        HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
258        loader.put(p);
259        r.flush(true);
260      }
261
262      HRegion spyR = spy(r);
263      doAnswer(new Answer<Object>() {
264        @Override
265        public Object answer(InvocationOnMock invocation) throws Throwable {
266          r.writestate.writesEnabled = false;
267          return invocation.callRealMethod();
268        }
269      }).when(spyR).doRegionCompactionPrep();
270
271      // force a minor compaction, but not before requesting a stop
272      spyR.compactStores();
273
274      // ensure that the compaction stopped, all old files are intact,
275      HStore s = r.getStore(COLUMN_FAMILY);
276      assertEquals(compactionThreshold, s.getStorefilesCount());
277      assertTrue(s.getStorefilesSize() > 15 * 1000);
278      // and no new store files persisted past compactStores()
279      // only one empty dir exists in temp dir
280      FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
281      assertEquals(1, ls.length);
282      Path storeTempDir =
283        new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
284      assertTrue(r.getFilesystem().exists(storeTempDir));
285      ls = r.getFilesystem().listStatus(storeTempDir);
286      assertEquals(0, ls.length);
287    } finally {
288      // don't mess up future tests
289      r.writestate.writesEnabled = true;
290      conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
291
292      // Delete all Store information once done using
293      for (int i = 0; i < compactionThreshold; i++) {
294        Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
295        byte[][] famAndQf = { COLUMN_FAMILY, null };
296        delete.addFamily(famAndQf[0]);
297        r.delete(delete);
298      }
299      r.flush(true);
300
301      // Multiple versions allowed for an entry, so the delete isn't enough
302      // Lower TTL and expire to ensure that all our entries have been wiped
303      final int ttl = 1000;
304      for (HStore store : this.r.stores.values()) {
305        ScanInfo old = store.getScanInfo();
306        ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
307        store.setScanInfo(si);
308      }
309      Thread.sleep(ttl);
310
311      r.compact(true);
312      assertEquals(0, count());
313    }
314  }
315
316  private int count() throws IOException {
317    int count = 0;
318    for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
319      f.initReader();
320      try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) {
321        scanner.seek(KeyValue.LOWESTKEY);
322        while (scanner.next() != null) {
323          count++;
324        }
325      }
326    }
327    return count;
328  }
329
330  private void createStoreFile(final HRegion region) throws IOException {
331    createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
332  }
333
334  private void createStoreFile(final HRegion region, String family) throws IOException {
335    Table loader = new RegionAsTable(region);
336    HTestConst.addContent(loader, family);
337    region.flush(true);
338  }
339
340  @Test
341  public void testCompactionWithCorruptResult() throws Exception {
342    int nfiles = 10;
343    for (int i = 0; i < nfiles; i++) {
344      createStoreFile(r);
345    }
346    HStore store = r.getStore(COLUMN_FAMILY);
347
348    Collection<HStoreFile> storeFiles = store.getStorefiles();
349    DefaultCompactor tool = (DefaultCompactor) store.storeEngine.getCompactor();
350    CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
351    tool.compact(request, NoLimitThroughputController.INSTANCE, null);
352
353    // Now lets corrupt the compacted file.
354    FileSystem fs = store.getFileSystem();
355    // default compaction policy created one and only one new compacted file
356    Path tmpPath = store.getRegionFileSystem().createTempName();
357    try (FSDataOutputStream stream = fs.create(tmpPath, null, true, 512, (short) 3, 1024L, null)) {
358      stream.writeChars("CORRUPT FILE!!!!");
359    }
360    // The complete compaction should fail and the corrupt file should remain
361    // in the 'tmp' directory;
362    assertThrows(IOException.class, () -> store.doCompaction(null, null, null,
363      EnvironmentEdgeManager.currentTime(), Collections.singletonList(tmpPath)));
364    assertTrue(fs.exists(tmpPath));
365  }
366
367  /**
368   * Create a custom compaction request and be sure that we can track it through the queue, knowing
369   * when the compaction is completed.
370   */
371  @Test
372  public void testTrackingCompactionRequest() throws Exception {
373    // setup a compact/split thread on a mock server
374    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
375    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
376    CompactSplit thread = new CompactSplit(mockServer);
377    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
378
379    // setup a region/store with some files
380    HStore store = r.getStore(COLUMN_FAMILY);
381    createStoreFile(r);
382    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
383      createStoreFile(r);
384    }
385
386    CountDownLatch latch = new CountDownLatch(1);
387    Tracker tracker = new Tracker(latch);
388    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker, null);
389    // wait for the latch to complete.
390    latch.await();
391
392    thread.interruptIfNecessary();
393  }
394
395  @Test
396  public void testCompactionFailure() throws Exception {
397    // setup a compact/split thread on a mock server
398    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
399    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
400    CompactSplit thread = new CompactSplit(mockServer);
401    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
402
403    // setup a region/store with some files
404    HStore store = r.getStore(COLUMN_FAMILY);
405    createStoreFile(r);
406    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
407      createStoreFile(r);
408    }
409
410    HRegion mockRegion = Mockito.spy(r);
411    Mockito.when(mockRegion.checkSplit())
412      .thenThrow(new RuntimeException("Thrown intentionally by test!"));
413
414    try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {
415
416      long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
417      long preFailedCount = metricsWrapper.getNumCompactionsFailed();
418
419      CountDownLatch latch = new CountDownLatch(1);
420      Tracker tracker = new Tracker(latch);
421      thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
422        null);
423      // wait for the latch to complete.
424      latch.await(120, TimeUnit.SECONDS);
425
426      // compaction should have completed and been marked as failed due to error in split request
427      long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
428      long postFailedCount = metricsWrapper.getNumCompactionsFailed();
429
430      assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post="
431        + postCompletedCount + ")", postCompletedCount > preCompletedCount);
432      assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post="
433        + postFailedCount + ")", postFailedCount > preFailedCount);
434    }
435  }
436
437  /**
438   * Test no new Compaction requests are generated after calling stop compactions
439   */
440  @Test
441  public void testStopStartCompaction() throws IOException {
442    // setup a compact/split thread on a mock server
443    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
444    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
445    final CompactSplit thread = new CompactSplit(mockServer);
446    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
447    // setup a region/store with some files
448    HStore store = r.getStore(COLUMN_FAMILY);
449    createStoreFile(r);
450    for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
451      createStoreFile(r);
452    }
453    thread.switchCompaction(false);
454    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
455      CompactionLifeCycleTracker.DUMMY, null);
456    assertFalse(thread.isCompactionsEnabled());
457    int longCompactions = thread.getLongCompactions().getActiveCount();
458    int shortCompactions = thread.getShortCompactions().getActiveCount();
459    assertEquals(
460      "longCompactions=" + longCompactions + "," + "shortCompactions=" + shortCompactions, 0,
461      longCompactions + shortCompactions);
462    thread.switchCompaction(true);
463    assertTrue(thread.isCompactionsEnabled());
464    // Make sure no compactions have run.
465    assertEquals(0, thread.getLongCompactions().getCompletedTaskCount()
466      + thread.getShortCompactions().getCompletedTaskCount());
467    // Request a compaction and make sure it is submitted successfully.
468    thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
469      CompactionLifeCycleTracker.DUMMY, null);
470    // Wait until the compaction finishes.
471    Waiter.waitFor(UTIL.getConfiguration(), 5000,
472      (Waiter.Predicate<Exception>) () -> thread.getLongCompactions().getCompletedTaskCount()
473        + thread.getShortCompactions().getCompletedTaskCount() == 1);
474    // Make sure there are no compactions running.
475    assertEquals(0,
476      thread.getLongCompactions().getActiveCount() + thread.getShortCompactions().getActiveCount());
477  }
478
479  @Test
480  public void testInterruptingRunningCompactions() throws Exception {
481    // setup a compact/split thread on a mock server
482    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
483      WaitThroughPutController.class.getName());
484    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
485    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
486    CompactSplit thread = new CompactSplit(mockServer);
487
488    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
489
490    // setup a region/store with some files
491    HStore store = r.getStore(COLUMN_FAMILY);
492    int jmax = (int) Math.ceil(15.0 / compactionThreshold);
493    byte[] pad = new byte[1000]; // 1 KB chunk
494    for (int i = 0; i < compactionThreshold; i++) {
495      Table loader = new RegionAsTable(r);
496      Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
497      p.setDurability(Durability.SKIP_WAL);
498      for (int j = 0; j < jmax; j++) {
499        p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
500      }
501      HTestConst.addContent(loader, Bytes.toString(COLUMN_FAMILY));
502      loader.put(p);
503      r.flush(true);
504    }
505    HStore s = r.getStore(COLUMN_FAMILY);
506    int initialFiles = s.getStorefilesCount();
507
508    thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
509      CompactionLifeCycleTracker.DUMMY, null);
510
511    Thread.sleep(3000);
512    thread.switchCompaction(false);
513    assertEquals(initialFiles, s.getStorefilesCount());
514    // don't mess up future tests
515    thread.switchCompaction(true);
516  }
517
518  /**
519   * HBASE-7947: Regression test to ensure adding to the correct list in the {@link CompactSplit}
520   * @throws Exception on failure
521   */
522  @Test
523  public void testMultipleCustomCompactionRequests() throws Exception {
524    // setup a compact/split thread on a mock server
525    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
526    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
527    CompactSplit thread = new CompactSplit(mockServer);
528    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
529
530    // setup a region/store with some files
531    int numStores = r.getStores().size();
532    CountDownLatch latch = new CountDownLatch(numStores);
533    Tracker tracker = new Tracker(latch);
534    // create some store files and setup requests for each store on which we want to do a
535    // compaction
536    for (HStore store : r.getStores()) {
537      createStoreFile(r, store.getColumnFamilyName());
538      createStoreFile(r, store.getColumnFamilyName());
539      createStoreFile(r, store.getColumnFamilyName());
540      thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER, tracker,
541        null);
542    }
543    // wait for the latch to complete.
544    latch.await();
545
546    thread.interruptIfNecessary();
547  }
548
549  class StoreMockMaker extends StatefulStoreMockMaker {
550    public ArrayList<HStoreFile> compacting = new ArrayList<>();
551    public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
552    private final ArrayList<Integer> results;
553
554    public StoreMockMaker(ArrayList<Integer> results) {
555      this.results = results;
556    }
557
558    public class TestCompactionContext extends CompactionContext {
559
560      private List<HStoreFile> selectedFiles;
561
562      public TestCompactionContext(List<HStoreFile> selectedFiles) {
563        super();
564        this.selectedFiles = selectedFiles;
565      }
566
567      @Override
568      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
569        return new ArrayList<>();
570      }
571
572      @Override
573      public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
574        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
575        this.request = new CompactionRequestImpl(selectedFiles);
576        this.request.setPriority(getPriority());
577        return true;
578      }
579
580      @Override
581      public List<Path> compact(ThroughputController throughputController, User user)
582        throws IOException {
583        finishCompaction(this.selectedFiles);
584        return new ArrayList<>();
585      }
586    }
587
588    @Override
589    public synchronized Optional<CompactionContext> selectCompaction() {
590      CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
591      compacting.addAll(notCompacting);
592      notCompacting.clear();
593      try {
594        ctx.select(null, false, false, false);
595      } catch (IOException ex) {
596        fail("Shouldn't happen");
597      }
598      return Optional.of(ctx);
599    }
600
601    @Override
602    public synchronized void cancelCompaction(Object object) {
603      TestCompactionContext ctx = (TestCompactionContext) object;
604      compacting.removeAll(ctx.selectedFiles);
605      notCompacting.addAll(ctx.selectedFiles);
606    }
607
608    public synchronized void finishCompaction(List<HStoreFile> sfs) {
609      if (sfs.isEmpty()) return;
610      synchronized (results) {
611        results.add(sfs.size());
612      }
613      compacting.removeAll(sfs);
614    }
615
616    @Override
617    public int getPriority() {
618      return 7 - compacting.size() - notCompacting.size();
619    }
620  }
621
622  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
623    BlockingCompactionContext blocked = null;
624
625    public class BlockingCompactionContext extends CompactionContext {
626      public volatile boolean isInCompact = false;
627
628      public void unblock() {
629        synchronized (this) {
630          this.notifyAll();
631        }
632      }
633
634      @Override
635      public List<Path> compact(ThroughputController throughputController, User user)
636        throws IOException {
637        try {
638          isInCompact = true;
639          synchronized (this) {
640            this.wait();
641          }
642        } catch (InterruptedException e) {
643          Assume.assumeNoException(e);
644        }
645        return new ArrayList<>();
646      }
647
648      @Override
649      public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
650        return new ArrayList<>();
651      }
652
653      @Override
654      public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
655        throws IOException {
656        this.request = new CompactionRequestImpl(new ArrayList<>());
657        return true;
658      }
659    }
660
661    @Override
662    public Optional<CompactionContext> selectCompaction() {
663      this.blocked = new BlockingCompactionContext();
664      try {
665        this.blocked.select(null, false, false, false);
666      } catch (IOException ex) {
667        fail("Shouldn't happen");
668      }
669      return Optional.of(blocked);
670    }
671
672    @Override
673    public void cancelCompaction(Object object) {
674    }
675
676    @Override
677    public int getPriority() {
678      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
679    }
680
681    public BlockingCompactionContext waitForBlocking() {
682      while (this.blocked == null || !this.blocked.isInCompact) {
683        Threads.sleepWithoutInterrupt(50);
684      }
685      BlockingCompactionContext ctx = this.blocked;
686      this.blocked = null;
687      return ctx;
688    }
689
690    @Override
691    public HStore createStoreMock(String name) throws Exception {
692      return createStoreMock(Integer.MIN_VALUE, name);
693    }
694
695    public HStore createStoreMock(int priority, String name) throws Exception {
696      // Override the mock to always return the specified priority.
697      HStore s = super.createStoreMock(name);
698      when(s.getCompactPriority()).thenReturn(priority);
699      return s;
700    }
701  }
702
703  /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
704  @Test
705  public void testCompactionQueuePriorities() throws Exception {
706    // Setup a compact/split thread on a mock server.
707    final Configuration conf = HBaseConfiguration.create();
708    HRegionServer mockServer = mock(HRegionServer.class);
709    when(mockServer.isStopped()).thenReturn(false);
710    when(mockServer.getConfiguration()).thenReturn(conf);
711    when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
712    CompactSplit cst = new CompactSplit(mockServer);
713    when(mockServer.getCompactSplitThread()).thenReturn(cst);
714    // prevent large compaction thread pool stealing job from small compaction queue.
715    cst.shutdownLongCompactions();
716    // Set up the region mock that redirects compactions.
717    HRegion r = mock(HRegion.class);
718    when(r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
719      @Override
720      public Boolean answer(InvocationOnMock invocation) throws Throwable {
721        invocation.<CompactionContext> getArgument(0).compact(invocation.getArgument(2), null);
722        return true;
723      }
724    });
725
726    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
727    ArrayList<Integer> results = new ArrayList<>();
728    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
729    HStore store = sm.createStoreMock("store1");
730    HStore store2 = sm2.createStoreMock("store2");
731    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
732
733    // First, block the compaction thread so that we could muck with queue.
734    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
735    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
736
737    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
738    for (int i = 0; i < 4; ++i) {
739      sm.notCompacting.add(createFile());
740    }
741    cst.requestSystemCompaction(r, store, "s1-pri3");
742    for (int i = 0; i < 3; ++i) {
743      sm2.notCompacting.add(createFile());
744    }
745    cst.requestSystemCompaction(r, store2, "s2-pri4");
746    // Now add 2 more files to store1 and queue compaction - pri 1.
747    for (int i = 0; i < 2; ++i) {
748      sm.notCompacting.add(createFile());
749    }
750    cst.requestSystemCompaction(r, store, "s1-pri1");
751    // Finally add blocking compaction with priority 2.
752    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
753
754    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
755    currentBlock.unblock();
756    currentBlock = blocker.waitForBlocking();
757    // Pri1 should have "compacted" all 6 files.
758    assertEquals(1, results.size());
759    assertEquals(6, results.get(0).intValue());
760    // Add 2 files to store 1 (it has 2 files now).
761    for (int i = 0; i < 2; ++i) {
762      sm.notCompacting.add(createFile());
763    }
764    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
765    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
766    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
767    currentBlock.unblock();
768    currentBlock = blocker.waitForBlocking();
769    assertEquals(3, results.size());
770    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
771    assertEquals(2, results.get(2).intValue());
772
773    currentBlock.unblock();
774    cst.interruptIfNecessary();
775  }
776
777  /**
778   * Firstly write 10 cells (with different time stamp) to a qualifier and flush to hfile1, then
779   * write 10 cells (with different time stamp) to the same qualifier and flush to hfile2. The
780   * latest cell (cell-A) in hfile1 and the oldest cell (cell-B) in hfile2 are with the same time
781   * stamp but different sequence id, and will get scanned successively during compaction.
782   * <p/>
783   * We set compaction.kv.max to 10 so compaction will scan 10 versions each round, meanwhile we set
784   * keepSeqIdPeriod=0 in {@link DummyCompactor} so all 10 versions of hfile2 will be written out
785   * with seqId cleaned (set to 0) including cell-B, then when scanner goes to cell-A it will cause
786   * a scan out-of-order assertion error before HBASE-16931 if error occurs during the test
787   */
788  @Test
789  public void testCompactionSeqId() throws Exception {
790    final byte[] ROW = Bytes.toBytes("row");
791    final byte[] QUALIFIER = Bytes.toBytes("qualifier");
792
793    long timestamp = 10000;
794
795    // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
796    // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
797    // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
798    // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
799    // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
800    // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
801    // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
802    // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
803    // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
804    // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
805    for (int i = 0; i < 10; i++) {
806      Put put = new Put(ROW);
807      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
808      r.put(put);
809    }
810    r.flush(true);
811
812    // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
813    // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
814    // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
815    // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
816    // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
817    // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
818    // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
819    // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
820    // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
821    // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
822    for (int i = 18; i > 8; i--) {
823      Put put = new Put(ROW);
824      put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
825      r.put(put);
826    }
827    r.flush(true);
828    r.compact(true);
829  }
830
831  public static class DummyCompactor extends DefaultCompactor {
832    public DummyCompactor(Configuration conf, HStore store) {
833      super(conf, store);
834      this.keepSeqIdPeriod = 0;
835    }
836  }
837
838  private static HStoreFile createFile() throws Exception {
839    HStoreFile sf = mock(HStoreFile.class);
840    when(sf.getPath()).thenReturn(new Path("file"));
841    StoreFileReader r = mock(StoreFileReader.class);
842    when(r.length()).thenReturn(10L);
843    when(sf.getReader()).thenReturn(r);
844    return sf;
845  }
846
847  /**
848   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
849   * finishes.
850   */
851  public static class Tracker implements CompactionLifeCycleTracker {
852
853    private final CountDownLatch done;
854
855    public Tracker(CountDownLatch done) {
856      this.done = done;
857    }
858
859    @Override
860    public void afterExecution(Store store) {
861      done.countDown();
862    }
863  }
864
865  /**
866   * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
867   * finishes.
868   */
869  public static class WaitThroughPutController extends NoLimitThroughputController {
870
871    public WaitThroughPutController() {
872    }
873
874    @Override
875    public long control(String compactionName, long size) throws InterruptedException {
876      Thread.sleep(6000000);
877      return 6000000;
878    }
879  }
880}