001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.UUID;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.ExtendedCell;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Delete;
050import org.apache.hadoop.hbase.client.Durability;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.io.hfile.CacheConfig;
059import org.apache.hadoop.hbase.io.hfile.HFile;
060import org.apache.hadoop.hbase.io.hfile.HFileContext;
061import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
062import org.apache.hadoop.hbase.regionserver.BloomType;
063import org.apache.hadoop.hbase.regionserver.HRegion;
064import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
065import org.apache.hadoop.hbase.regionserver.HStore;
066import org.apache.hadoop.hbase.regionserver.HStoreFile;
067import org.apache.hadoop.hbase.regionserver.InternalScanner;
068import org.apache.hadoop.hbase.regionserver.RegionAsTable;
069import org.apache.hadoop.hbase.regionserver.StoreContext;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
071import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
072import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
073import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
074import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
075import org.apache.hadoop.hbase.security.User;
076import org.apache.hadoop.hbase.testclassification.MediumTests;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.CommonFSUtils;
079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
080import org.apache.hadoop.hbase.util.Pair;
081import org.junit.After;
082import org.junit.ClassRule;
083import org.junit.Rule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.junit.rules.TestName;
087import org.junit.runner.RunWith;
088import org.junit.runners.Parameterized;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092/**
093 * Test mob store compaction
094 */
095@RunWith(Parameterized.class)
096@Category(MediumTests.class)
097public class TestMobStoreCompaction {
098
099  @ClassRule
100  public static final HBaseClassTestRule CLASS_RULE =
101    HBaseClassTestRule.forClass(TestMobStoreCompaction.class);
102
103  @Rule
104  public TestName name = new TestName();
105  static final Logger LOG = LoggerFactory.getLogger(TestMobStoreCompaction.class.getName());
106  private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
107  private Configuration conf = null;
108
109  private HRegion region = null;
110  private TableDescriptor tableDescriptor = null;
111  private ColumnFamilyDescriptor familyDescriptor = null;
112  private long mobCellThreshold = 1000;
113
114  private FileSystem fs;
115
116  private static final byte[] COLUMN_FAMILY = fam1;
117  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
118  private int compactionThreshold;
119
120  private Boolean useFileBasedSFT;
121
122  public TestMobStoreCompaction(Boolean useFileBasedSFT) {
123    this.useFileBasedSFT = useFileBasedSFT;
124  }
125
126  @Parameterized.Parameters
127  public static Collection<Boolean> data() {
128    Boolean[] data = { false, true };
129    return Arrays.asList(data);
130  }
131
132  private void init(Configuration conf, long mobThreshold) throws Exception {
133    if (useFileBasedSFT) {
134      conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
135        "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
136    }
137
138    this.conf = conf;
139    this.mobCellThreshold = mobThreshold;
140
141    HBaseTestingUtil UTIL = new HBaseTestingUtil(conf);
142
143    compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
144    familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
145      .setMobThreshold(mobThreshold).setMaxVersions(1).build();
146    tableDescriptor = UTIL.createModifyableTableDescriptor(TestMobUtils.getTableName(name))
147      .modifyColumnFamily(familyDescriptor).build();
148
149    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
150    region = HBaseTestingUtil.createRegionAndWAL(regionInfo, UTIL.getDataTestDir(), conf,
151      tableDescriptor, new MobFileCache(conf));
152    fs = FileSystem.get(conf);
153  }
154
155  @After
156  public void tearDown() throws Exception {
157    region.close();
158    fs.delete(UTIL.getDataTestDir(), true);
159  }
160
161  /**
162   * During compaction, cells smaller than the threshold won't be affected.
163   */
164  @Test
165  public void testSmallerValue() throws Exception {
166    init(UTIL.getConfiguration(), 500);
167    byte[] dummyData = makeDummyData(300); // smaller than mob threshold
168    Table loader = new RegionAsTable(region);
169    // one hfile per row
170    for (int i = 0; i < compactionThreshold; i++) {
171      Put p = createPut(i, dummyData);
172      loader.put(p);
173      region.flush(true);
174    }
175    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
176    assertEquals("Before compaction: mob file count", 0, countMobFiles());
177    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
178    assertEquals("Before compaction: mob rows", 0, countMobRows());
179
180    region.compactStores();
181
182    assertEquals("After compaction: store files", 1, countStoreFiles());
183    assertEquals("After compaction: mob file count", 0, countMobFiles());
184    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
185    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
186    assertEquals("After compaction: mob rows", 0, countMobRows());
187  }
188
189  /**
190   * During compaction, the mob threshold size is changed.
191   */
192  @Test
193  public void testLargerValue() throws Exception {
194    init(UTIL.getConfiguration(), 200);
195    byte[] dummyData = makeDummyData(300); // larger than mob threshold
196    Table loader = new RegionAsTable(region);
197    for (int i = 0; i < compactionThreshold; i++) {
198      Put p = createPut(i, dummyData);
199      loader.put(p);
200      region.flush(true);
201    }
202    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
203    assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
204    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
205    assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
206    assertEquals("Before compaction: number of mob cells", compactionThreshold,
207      countMobCellsInMetadata());
208    // Change the threshold larger than the data size
209    setMobThreshold(region, COLUMN_FAMILY, 500);
210    region.initialize();
211
212    List<HStore> stores = region.getStores();
213    for (HStore store : stores) {
214      // Force major compaction
215      store.triggerMajorCompaction();
216      Optional<CompactionContext> context = store.requestCompaction(HStore.PRIORITY_USER,
217        CompactionLifeCycleTracker.DUMMY, User.getCurrent());
218      if (!context.isPresent()) {
219        continue;
220      }
221      region.compact(context.get(), store, NoLimitThroughputController.INSTANCE, User.getCurrent());
222    }
223
224    assertEquals("After compaction: store files", 1, countStoreFiles());
225    assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
226    assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
227    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
228    assertEquals("After compaction: mob rows", 0, countMobRows());
229  }
230
231  private static HRegion setMobThreshold(HRegion region, byte[] cfName, long modThreshold) {
232    ColumnFamilyDescriptor cfd =
233      ColumnFamilyDescriptorBuilder.newBuilder(region.getTableDescriptor().getColumnFamily(cfName))
234        .setMobThreshold(modThreshold).build();
235    TableDescriptor td = TableDescriptorBuilder.newBuilder(region.getTableDescriptor())
236      .removeColumnFamily(cfName).setColumnFamily(cfd).build();
237    region.setTableDescriptor(td);
238    return region;
239  }
240
241  /**
242   * This test will first generate store files, then bulk load them and trigger the compaction. When
243   * compaction, the cell value will be larger than the threshold.
244   */
245  @Test
246  public void testMobCompactionWithBulkload() throws Exception {
247    // The following will produce store files of 600.
248    init(UTIL.getConfiguration(), 300);
249    byte[] dummyData = makeDummyData(600);
250
251    Path hbaseRootDir = CommonFSUtils.getRootDir(conf);
252    Path basedir = new Path(hbaseRootDir, tableDescriptor.getTableName().getNameAsString());
253    List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
254    for (int i = 0; i < compactionThreshold; i++) {
255      Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-", ""));
256      hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
257      createHFile(hpath, i, dummyData);
258    }
259
260    // The following will bulk load the above generated store files and compact, with 600(fileSize)
261    // > 300(threshold)
262    Map<byte[], List<Path>> map = region.bulkLoadHFiles(hfiles, true, null);
263    assertTrue("Bulkload result:", !map.isEmpty());
264    assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
265    assertEquals("Before compaction: mob file count", 0, countMobFiles());
266    assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));
267    assertEquals("Before compaction: mob rows", 0, countMobRows());
268    assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
269
270    region.compactStores();
271
272    assertEquals("After compaction: store files", 1, countStoreFiles());
273    assertEquals("After compaction: mob file count:", 1, countMobFiles());
274    assertEquals("After compaction: rows", compactionThreshold, UTIL.countRows(region));
275    assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
276    assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
277    assertEquals("After compaction: number of mob cells", compactionThreshold,
278      countMobCellsInMetadata());
279  }
280
281  @Test
282  public void testMajorCompactionAfterDelete() throws Exception {
283    init(UTIL.getConfiguration(), 100);
284    byte[] dummyData = makeDummyData(200); // larger than mob threshold
285    Table loader = new RegionAsTable(region);
286    // create hfiles and mob hfiles but don't trigger compaction
287    int numHfiles = compactionThreshold - 1;
288    byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
289    for (int i = 0; i < numHfiles; i++) {
290      Put p = createPut(i, dummyData);
291      loader.put(p);
292      region.flush(true);
293    }
294    assertEquals("Before compaction: store files", numHfiles, countStoreFiles());
295    assertEquals("Before compaction: mob file count", numHfiles, countMobFiles());
296    assertEquals("Before compaction: rows", numHfiles, UTIL.countRows(region));
297    assertEquals("Before compaction: mob rows", numHfiles, countMobRows());
298    assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata());
299    // now let's delete some cells that contain mobs
300    Delete delete = new Delete(deleteRow);
301    delete.addFamily(COLUMN_FAMILY);
302    region.delete(delete);
303    region.flush(true);
304
305    assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles());
306    assertEquals("Before compaction: mob files", numHfiles, countMobFiles());
307    // region.compactStores();
308    region.compact(true);
309    assertEquals("After compaction: store files", 1, countStoreFiles());
310  }
311
312  private int countStoreFiles() throws IOException {
313    HStore store = region.getStore(COLUMN_FAMILY);
314    return store.getStorefilesCount();
315  }
316
317  private int countMobFiles() throws IOException {
318    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
319      familyDescriptor.getNameAsString());
320    if (fs.exists(mobDirPath)) {
321      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
322      return files.length;
323    }
324    return 0;
325  }
326
327  private long countMobCellsInMetadata() throws IOException {
328    long mobCellsCount = 0;
329    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
330      familyDescriptor.getNameAsString());
331    Configuration copyOfConf = new Configuration(conf);
332    copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
333    CacheConfig cacheConfig = new CacheConfig(copyOfConf);
334    if (fs.exists(mobDirPath)) {
335      // TODO: use sft.load() api here
336      HRegionFileSystem regionFs = HRegionFileSystem.create(copyOfConf, fs,
337        MobUtils.getMobTableDir(copyOfConf, tableDescriptor.getTableName()),
338        region.getRegionInfo());
339      StoreFileTracker sft = StoreFileTrackerFactory.create(copyOfConf, false,
340        StoreContext.getBuilder().withColumnFamilyDescriptor(familyDescriptor)
341          .withFamilyStoreDirectoryPath(mobDirPath).withCacheConfig(cacheConfig)
342          .withRegionFileSystem(regionFs).build());
343      FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
344      for (FileStatus file : files) {
345        HStoreFile sf =
346          new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true, sft);
347        sf.initReader();
348        Map<byte[], byte[]> fileInfo = sf.getReader().loadFileInfo();
349        byte[] count = fileInfo.get(MOB_CELLS_COUNT);
350        assertTrue(count != null);
351        mobCellsCount += Bytes.toLong(count);
352      }
353    }
354    return mobCellsCount;
355  }
356
357  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
358    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
359    p.setDurability(Durability.SKIP_WAL);
360    p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
361    return p;
362  }
363
364  /**
365   * Create an HFile with the given number of bytes
366   */
367  private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
368    HFileContext meta = new HFileContextBuilder().build();
369    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
370      .withFileContext(meta).create();
371    long now = EnvironmentEdgeManager.currentTime();
372    try {
373      KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
374        Bytes.toBytes("colX"), now, dummyData);
375      writer.append(kv);
376    } finally {
377      writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
378      writer.close();
379    }
380  }
381
382  private int countMobRows() throws IOException {
383    Scan scan = new Scan();
384    // Do not retrieve the mob data when scanning
385    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
386    InternalScanner scanner = region.getScanner(scan);
387
388    int scannedCount = 0;
389    List<ExtendedCell> results = new ArrayList<>();
390    boolean hasMore = true;
391    while (hasMore) {
392      hasMore = scanner.next(results);
393      for (ExtendedCell c : results) {
394        if (MobUtils.isMobReferenceCell(c)) {
395          scannedCount++;
396        }
397      }
398      results.clear();
399    }
400    scanner.close();
401
402    return scannedCount;
403  }
404
405  private byte[] makeDummyData(int size) {
406    byte[] dummyData = new byte[size];
407    Bytes.random(dummyData);
408    return dummyData;
409  }
410
411  private int countReferencedMobFiles() throws IOException {
412    Scan scan = new Scan();
413    // Do not retrieve the mob data when scanning
414    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
415    InternalScanner scanner = region.getScanner(scan);
416
417    List<ExtendedCell> kvs = new ArrayList<>();
418    boolean hasMore = true;
419    String fileName;
420    Set<String> files = new HashSet<>();
421    do {
422      kvs.clear();
423      hasMore = scanner.next(kvs);
424      for (Cell kv : kvs) {
425        if (!MobUtils.isMobReferenceCell((ExtendedCell) kv)) {
426          continue;
427        }
428        if (!MobUtils.hasValidMobRefCellValue(kv)) {
429          continue;
430        }
431        int size = MobUtils.getMobValueLength(kv);
432        if (size <= mobCellThreshold) {
433          continue;
434        }
435        fileName = MobUtils.getMobFileName(kv);
436        if (fileName.isEmpty()) {
437          continue;
438        }
439        files.add(fileName);
440        Path familyPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
441          familyDescriptor.getNameAsString());
442        assertTrue(fs.exists(new Path(familyPath, fileName)));
443      }
444    } while (hasMore);
445
446    scanner.close();
447
448    return files.size();
449  }
450
451}