001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.List;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.CompactionState;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.RegionSplitter;
050import org.junit.After;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057import org.junit.runner.RunWith;
058import org.junit.runners.Parameterized;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size
064 * to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10
065 * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
066 * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
067 * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
068 * mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
069 * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
070 * scanner and checks all 3 * 1000 rows.
071 */
072@RunWith(Parameterized.class)
073@Category(LargeTests.class)
074public class TestMobCompactionWithDefaults {
075  private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
079
080  protected HBaseTestingUtil HTU;
081  protected static Configuration conf;
082  protected static long minAgeToArchive = 10000;
083
084  protected final static String famStr = "f1";
085  protected final static byte[] fam = Bytes.toBytes(famStr);
086  protected final static byte[] qualifier = Bytes.toBytes("q1");
087  protected final static long mobLen = 10;
088  protected final static byte[] mobVal = Bytes
089    .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
090
091  @Rule
092  public TestName test = new TestName();
093  protected TableDescriptor tableDescriptor;
094  private ColumnFamilyDescriptor familyDescriptor;
095  protected Admin admin;
096  protected TableName table = null;
097  protected int numRegions = 20;
098  protected int rows = 1000;
099
100  protected Boolean useFileBasedSFT;
101
102  public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
103    this.useFileBasedSFT = useFileBasedSFT;
104  }
105
106  @Parameterized.Parameters
107  public static Collection<Boolean> data() {
108    Boolean[] data = { false, true };
109    return Arrays.asList(data);
110  }
111
112  protected void htuStart() throws Exception {
113    HTU = new HBaseTestingUtil();
114    conf = HTU.getConfiguration();
115    conf.setInt("hfile.format.version", 3);
116    // Disable automatic MOB compaction
117    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
118    // Disable automatic MOB file cleaner chore
119    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
120    // Set minimum age to archive to 10 sec
121    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
122    // Set compacted file discharger interval to a half minAgeToArchive
123    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
124    conf.setBoolean("hbase.regionserver.compaction.enabled", false);
125    if (useFileBasedSFT) {
126      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
127        "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
128    }
129    additonalConfigSetup();
130    HTU.startMiniCluster();
131  }
132
133  protected void additonalConfigSetup() {
134  }
135
136  @Before
137  public void setUp() throws Exception {
138    htuStart();
139    admin = HTU.getAdmin();
140    familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
141      .setMobThreshold(mobLen).setMaxVersions(1).build();
142    tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test))
143      .setColumnFamily(familyDescriptor).build();
144    RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
145    byte[][] splitKeys = splitAlgo.split(numRegions);
146    table = HTU.createTable(tableDescriptor, splitKeys).getName();
147  }
148
149  private void loadData(TableName tableName, int num) {
150    LOG.info("Started loading {} rows into {}", num, tableName);
151    try (final Table table = HTU.getConnection().getTable(tableName)) {
152      for (int i = 0; i < num; i++) {
153        byte[] key = new byte[32];
154        Bytes.random(key);
155        Put p = new Put(key);
156        p.addColumn(fam, qualifier, mobVal);
157        table.put(p);
158      }
159      admin.flush(tableName);
160      LOG.info("Finished loading {} rows into {}", num, tableName);
161    } catch (Exception e) {
162      LOG.error("MOB file compaction chore test FAILED", e);
163      fail("MOB file compaction chore test FAILED");
164    }
165  }
166
167  @After
168  public void tearDown() throws Exception {
169    admin.disableTable(tableDescriptor.getTableName());
170    admin.deleteTable(tableDescriptor.getTableName());
171    HTU.shutdownMiniCluster();
172  }
173
174  @Test
175  public void baseTestMobFileCompaction() throws InterruptedException, IOException {
176    LOG.info("MOB compaction " + description() + " started");
177    loadAndFlushThreeTimes(rows, table, famStr);
178    mobCompact(tableDescriptor, familyDescriptor);
179    assertEquals("Should have 4 MOB files per region due to 3xflush + compaction.", numRegions * 4,
180      getNumberOfMobFiles(table, famStr));
181    cleanupAndVerifyCounts(table, famStr, 3 * rows);
182    LOG.info("MOB compaction " + description() + " finished OK");
183  }
184
185  @Test
186  public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
187    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
188    LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
189    loadAndFlushThreeTimes(rows, table, famStr);
190    LOG.debug("Taking snapshot and cloning table {}", table);
191    admin.snapshot(TestMobUtils.getTableName(test), table);
192    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
193    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
194      getNumberOfMobFiles(clone, famStr));
195    mobCompact(admin.getDescriptor(clone), familyDescriptor);
196    assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + compact",
197      4 * numRegions, getNumberOfMobFiles(clone, famStr));
198    cleanupAndVerifyCounts(clone, famStr, 3 * rows);
199    LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK");
200  }
201
202  @Test
203  public void testMobFileCompactionAfterSnapshotCloneAndFlush()
204    throws InterruptedException, IOException {
205    final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone");
206    LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
207    loadAndFlushThreeTimes(rows, table, famStr);
208    LOG.debug("Taking snapshot and cloning table {}", table);
209    admin.snapshot(TestMobUtils.getTableName(test), table);
210    admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
211    assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
212      getNumberOfMobFiles(clone, famStr));
213    loadAndFlushThreeTimes(rows, clone, famStr);
214    mobCompact(admin.getDescriptor(clone), familyDescriptor);
215    assertEquals("Should have 7 MOB file per region due to clone + 3xflush + compact",
216      7 * numRegions, getNumberOfMobFiles(clone, famStr));
217    cleanupAndVerifyCounts(clone, famStr, 6 * rows);
218    LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK");
219  }
220
221  protected void loadAndFlushThreeTimes(int rows, TableName table, String family)
222    throws IOException {
223    final long start = getNumberOfMobFiles(table, family);
224    // Load and flush data 3 times
225    loadData(table, rows);
226    loadData(table, rows);
227    loadData(table, rows);
228    assertEquals("Should have 3 more mob files per region from flushing.", start + numRegions * 3,
229      getNumberOfMobFiles(table, family));
230  }
231
232  protected String description() {
233    return "regular mode";
234  }
235
236  protected void enableCompactions() throws IOException {
237    final List<String> serverList =
238      admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
239    admin.compactionSwitch(true, serverList);
240  }
241
242  protected void disableCompactions() throws IOException {
243    final List<String> serverList =
244      admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
245    admin.compactionSwitch(false, serverList);
246  }
247
248  /**
249   * compact the given table and return once it is done. should presume compactions are disabled
250   * when called. should ensure compactions are disabled before returning.
251   */
252  protected void mobCompact(TableDescriptor tableDescriptor,
253    ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
254    LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
255    enableCompactions();
256    mobCompactImpl(tableDescriptor, familyDescriptor);
257    waitUntilCompactionIsComplete(tableDescriptor.getTableName());
258    disableCompactions();
259  }
260
261  /**
262   * Call the API for compaction specific to the test set. should not wait for compactions to
263   * finish. may assume compactions are enabled when called.
264   */
265  protected void mobCompactImpl(TableDescriptor tableDescriptor,
266    ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
267    admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
268  }
269
270  protected void waitUntilCompactionIsComplete(TableName table)
271    throws IOException, InterruptedException {
272    CompactionState state = admin.getCompactionState(table);
273    while (state != CompactionState.NONE) {
274      LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state);
275      Thread.sleep(100);
276      state = admin.getCompactionState(table);
277    }
278    LOG.debug("done waiting for compaction on {}", table);
279  }
280
281  protected void cleanupAndVerifyCounts(TableName table, String family, int rows)
282    throws InterruptedException, IOException {
283    // We have guarantee, that compacted file discharger will run during this pause
284    // because it has interval less than this wait time
285    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
286
287    Thread.sleep(minAgeToArchive + 1000);
288    LOG.info("Cleaning up MOB files");
289
290    // run cleaner chore on each RS
291    for (ServerName sn : admin.getRegionServers()) {
292      HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
293    }
294
295    assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
296      getNumberOfMobFiles(table, family));
297
298    LOG.debug("checking count of rows");
299    long scanned = scanTable(table);
300    assertEquals("Got the wrong number of rows in table " + table + " cf " + family, rows, scanned);
301
302  }
303
304  protected long getNumberOfMobFiles(TableName tableName, String family) throws IOException {
305    FileSystem fs = FileSystem.get(conf);
306    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
307    FileStatus[] stat = fs.listStatus(dir);
308    for (FileStatus st : stat) {
309      LOG.debug("MOB Directory content: {}", st.getPath());
310    }
311    LOG.debug("MOB Directory content total files: {}", stat.length);
312
313    return stat.length;
314  }
315
316  protected long scanTable(TableName tableName) {
317    try (final Table table = HTU.getConnection().getTable(tableName);
318      final ResultScanner scanner = table.getScanner(fam)) {
319      Result result;
320      long counter = 0;
321      while ((result = scanner.next()) != null) {
322        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
323        counter++;
324      }
325      return counter;
326    } catch (Exception e) {
327      LOG.error("MOB file compaction test FAILED", e);
328      if (HTU != null) {
329        fail(e.getMessage());
330      } else {
331        System.exit(-1);
332      }
333    }
334    return 0;
335  }
336}