001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.List;
028import java.util.Random;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.CompactionState;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.RegionSplitter;
052import org.junit.After;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.junit.runner.RunWith;
060import org.junit.runners.Parameterized;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size
066 * to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10
067 * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
068 * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
069 * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
070 * mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
071 * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
072 * scanner and checks all 3 * 1000 rows.
073 */
074@RunWith(Parameterized.class)
075@Category(LargeTests.class)
076public class TestMobCompactionWithDefaults {
077  private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
081
082  protected static HBaseTestingUtility HTU;
083  protected static Configuration conf;
084  protected static long minAgeToArchive = 10000;
085
086  protected final static String famStr = "f1";
087  protected final static byte[] fam = Bytes.toBytes(famStr);
088  protected final static byte[] qualifier = Bytes.toBytes("q1");
089  protected final static long mobLen = 10;
090  protected final static byte[] mobVal = Bytes
091    .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
092
093  @Rule
094  public TestName test = new TestName();
095  protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
096  protected ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor;
097  protected Admin admin;
098  protected TableName table = null;
099  protected int numRegions = 20;
100  protected int rows = 1000;
101
102  protected Boolean useFileBasedSFT;
103
104  public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
105    this.useFileBasedSFT = useFileBasedSFT;
106  }
107
108  @Parameterized.Parameters
109  public static Collection<Boolean> data() {
110    Boolean[] data = { false, true };
111    return Arrays.asList(data);
112  }
113
114  protected void htuStart() throws Exception {
115    HTU = new HBaseTestingUtility();
116    conf = HTU.getConfiguration();
117    conf.setInt("hfile.format.version", 3);
118    // Disable automatic MOB compaction
119    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
120    // Disable automatic MOB file cleaner chore
121    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
122    // Set minimum age to archive to 10 sec
123    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
124    // Set compacted file discharger interval to a half minAgeToArchive
125    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
126    conf.setBoolean("hbase.regionserver.compaction.enabled", false);
127    if (useFileBasedSFT) {
128      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
129        "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
130    }
131    additonalConfigSetup();
132    HTU.startMiniCluster();
133  }
134
135  protected void additonalConfigSetup() {
136  }
137
138  @Before
139  public void setUp() throws Exception {
140    htuStart();
141    tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test));
142    admin = HTU.getAdmin();
143    familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
144    familyDescriptor.setMobEnabled(true);
145    familyDescriptor.setMobThreshold(mobLen);
146    familyDescriptor.setMaxVersions(1);
147    tableDescriptor.setColumnFamily(familyDescriptor);
148    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
149    byte[][] splitKeys = splitAlgo.split(numRegions);
150    table = HTU.createTable(tableDescriptor, splitKeys).getName();
151  }
152
153  private void loadData(TableName tableName, int num) {
154
155    Random r = new Random();
156    LOG.info("Started loading {} rows into {}", num, tableName);
157    try (final Table table = HTU.getConnection().getTable(tableName)) {
158      for (int i = 0; i < num; i++) {
159        byte[] key = new byte[32];
160        r.nextBytes(key);
161        Put p = new Put(key);
162        p.addColumn(fam, qualifier, mobVal);
163        table.put(p);
164      }
165      admin.flush(tableName);
166      LOG.info("Finished loading {} rows into {}", num, tableName);
167    } catch (Exception e) {
168      LOG.error("MOB file compaction chore test FAILED", e);
169      fail("MOB file compaction chore test FAILED");
170    }
171  }
172
173  @After
174  public void tearDown() throws Exception {
175    admin.disableTable(tableDescriptor.getTableName());
176    admin.deleteTable(tableDescriptor.getTableName());
177    HTU.shutdownMiniCluster();
178  }
179
180  @Test
181  public void baseTestMobFileCompaction() throws InterruptedException, IOException {
182    LOG.info("MOB compaction " + description() + " started");
183    loadAndFlushThreeTimes(rows, table, famStr);
184    mobCompact(tableDescriptor, familyDescriptor);
185    assertEquals("Should have 4 MOB files per region due to 3xflush + compaction.", numRegions * 4,
186      getNumberOfMobFiles(table, famStr));
187    cleanupAndVerifyCounts(table, famStr, 3 * rows);
188    LOG.info("MOB compaction " + description() + " finished OK");
189  }
190
191  @Test
192  public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
193    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
194    LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
195    loadAndFlushThreeTimes(rows, table, famStr);
196    LOG.debug("Taking snapshot and cloning table {}", table);
197    admin.snapshot(TestMobUtils.getTableName(test), table);
198    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
199    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
200      getNumberOfMobFiles(clone, famStr));
201    mobCompact(admin.getDescriptor(clone), familyDescriptor);
202    assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + compact",
203      4 * numRegions, getNumberOfMobFiles(clone, famStr));
204    cleanupAndVerifyCounts(clone, famStr, 3 * rows);
205    LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK");
206  }
207
208  @Test
209  public void testMobFileCompactionAfterSnapshotCloneAndFlush()
210    throws InterruptedException, IOException {
211    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
212    LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
213    loadAndFlushThreeTimes(rows, table, famStr);
214    LOG.debug("Taking snapshot and cloning table {}", table);
215    admin.snapshot(TestMobUtils.getTableName(test), table);
216    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
217    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
218      getNumberOfMobFiles(clone, famStr));
219    loadAndFlushThreeTimes(rows, clone, famStr);
220    mobCompact(admin.getDescriptor(clone), familyDescriptor);
221    assertEquals("Should have 7 MOB file per region due to clone + 3xflush + compact",
222      7 * numRegions, getNumberOfMobFiles(clone, famStr));
223    cleanupAndVerifyCounts(clone, famStr, 6 * rows);
224    LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK");
225  }
226
227  protected void loadAndFlushThreeTimes(int rows, TableName table, String family)
228    throws IOException {
229    final long start = getNumberOfMobFiles(table, family);
230    // Load and flush data 3 times
231    loadData(table, rows);
232    loadData(table, rows);
233    loadData(table, rows);
234    assertEquals("Should have 3 more mob files per region from flushing.", start + numRegions * 3,
235      getNumberOfMobFiles(table, family));
236  }
237
238  protected String description() {
239    return "regular mode";
240  }
241
242  protected void enableCompactions() throws IOException {
243    final List<String> serverList =
244      admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
245    admin.compactionSwitch(true, serverList);
246  }
247
248  protected void disableCompactions() throws IOException {
249    final List<String> serverList =
250      admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
251    admin.compactionSwitch(false, serverList);
252  }
253
254  /**
255   * compact the given table and return once it is done. should presume compactions are disabled
256   * when called. should ensure compactions are disabled before returning.
257   */
258  protected void mobCompact(TableDescriptor tableDescriptor,
259    ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
260    LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
261    enableCompactions();
262    mobCompactImpl(tableDescriptor, familyDescriptor);
263    waitUntilCompactionIsComplete(tableDescriptor.getTableName());
264    disableCompactions();
265  }
266
267  /**
268   * Call the API for compaction specific to the test set. should not wait for compactions to
269   * finish. may assume compactions are enabled when called.
270   */
271  protected void mobCompactImpl(TableDescriptor tableDescriptor,
272    ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
273    admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
274  }
275
276  protected void waitUntilCompactionIsComplete(TableName table)
277    throws IOException, InterruptedException {
278    CompactionState state = admin.getCompactionState(table);
279    while (state != CompactionState.NONE) {
280      LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state);
281      Thread.sleep(100);
282      state = admin.getCompactionState(table);
283    }
284    LOG.debug("done waiting for compaction on {}", table);
285  }
286
287  protected void cleanupAndVerifyCounts(TableName table, String family, int rows)
288    throws InterruptedException, IOException {
289    // We have guarantee, that compacted file discharger will run during this pause
290    // because it has interval less than this wait time
291    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
292
293    Thread.sleep(minAgeToArchive + 1000);
294    LOG.info("Cleaning up MOB files");
295
296    // run cleaner chore on each RS
297    for (ServerName sn : admin.getRegionServers()) {
298      HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
299    }
300
301    assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
302      getNumberOfMobFiles(table, family));
303
304    LOG.debug("checking count of rows");
305    long scanned = scanTable(table);
306    assertEquals("Got the wrong number of rows in table " + table + " cf " + family, rows, scanned);
307
308  }
309
310  protected long getNumberOfMobFiles(TableName tableName, String family) throws IOException {
311    FileSystem fs = FileSystem.get(conf);
312    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
313    FileStatus[] stat = fs.listStatus(dir);
314    for (FileStatus st : stat) {
315      LOG.debug("MOB Directory content: {}", st.getPath());
316    }
317    LOG.debug("MOB Directory content total files: {}", stat.length);
318
319    return stat.length;
320  }
321
322  protected long scanTable(TableName tableName) {
323    try (final Table table = HTU.getConnection().getTable(tableName);
324      final ResultScanner scanner = table.getScanner(fam)) {
325      Result result;
326      long counter = 0;
327      while ((result = scanner.next()) != null) {
328        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
329        counter++;
330      }
331      return counter;
332    } catch (Exception e) {
333      LOG.error("MOB file compaction test FAILED", e);
334      if (HTU != null) {
335        fail(e.getMessage());
336      } else {
337        System.exit(-1);
338      }
339    }
340    return 0;
341  }
342}