001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.CompactionState;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HStore;
052import org.apache.hadoop.hbase.regionserver.HStoreFile;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.junit.After;
056import org.junit.Before;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and flushes it N times 3. Runs
065 * major MOB compaction 4. Verifies that number of MOB files in a mob directory is N+1 5. Waits for
066 * a period of time larger than minimum age to archive 6. Runs Mob cleaner chore 7 Verifies that
067 * every old MOB file referenced from current RS was archived
068 */
069@Category(MediumTests.class)
070public class TestRSMobFileCleanerChore {
071  private static final Logger LOG = LoggerFactory.getLogger(TestRSMobFileCleanerChore.class);
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class);
075
076  private HBaseTestingUtil HTU;
077
078  private final static String famStr = "f1";
079  private final static byte[] fam = Bytes.toBytes(famStr);
080  private final static byte[] qualifier = Bytes.toBytes("q1");
081  private final static long mobLen = 10;
082  private final static byte[] mobVal = Bytes
083    .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
084
085  private Configuration conf;
086  private TableDescriptor tableDescriptor;
087  private ColumnFamilyDescriptor familyDescriptor;
088  private Admin admin;
089  private Table table = null;
090  private RSMobFileCleanerChore chore;
091  private long minAgeToArchive = 10000;
092
093  public TestRSMobFileCleanerChore() {
094  }
095
096  @Before
097  public void setUp() throws Exception {
098    HTU = new HBaseTestingUtil();
099    conf = HTU.getConfiguration();
100
101    initConf();
102
103    HTU.startMiniCluster();
104    admin = HTU.getAdmin();
105    familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
106      .setMobThreshold(mobLen).setMaxVersions(1).build();
107    tableDescriptor = HTU.createModifyableTableDescriptor("testMobCompactTable")
108      .setColumnFamily(familyDescriptor).build();
109    table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1"));
110  }
111
112  private void initConf() {
113
114    conf.setInt("hfile.format.version", 3);
115    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
116    conf.setInt("hbase.client.retries.number", 100);
117    conf.setInt("hbase.hregion.max.filesize", 200000000);
118    conf.setInt("hbase.hregion.memstore.flush.size", 800000);
119    conf.setInt("hbase.hstore.blockingStoreFiles", 150);
120    conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800);
121    conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800);
122    // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
123    // FaultyMobStoreCompactor.class.getName());
124    // Disable automatic MOB compaction
125    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
126    // Disable automatic MOB file cleaner chore
127    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
128    // Set minimum age to archive to 10 sec
129    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
130    // Set compacted file discharger interval to a half minAgeToArchive
131    conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
132  }
133
134  private void loadData(Table t, int start, int num) {
135    try {
136
137      for (int i = 0; i < num; i++) {
138        Put p = new Put(Bytes.toBytes(start + i));
139        p.addColumn(fam, qualifier, mobVal);
140        t.put(p);
141      }
142      admin.flush(t.getName());
143    } catch (Exception e) {
144      LOG.error("MOB file cleaner chore test FAILED", e);
145      assertTrue(false);
146    }
147  }
148
149  @After
150  public void tearDown() throws Exception {
151    admin.disableTable(tableDescriptor.getTableName());
152    admin.deleteTable(tableDescriptor.getTableName());
153    HTU.shutdownMiniCluster();
154  }
155
156  @Test
157  public void testMobFileCleanerChore() throws InterruptedException, IOException {
158    loadData(table, 0, 10);
159    loadData(table, 10, 10);
160    // loadData(20, 10);
161    long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
162    assertEquals(2, num);
163    // Major compact
164    admin.majorCompact(tableDescriptor.getTableName(), fam);
165    // wait until compaction is complete
166    while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) {
167      Thread.sleep(100);
168    }
169
170    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
171    assertEquals(3, num);
172    // We have guarantee, that compcated file discharger will run during this pause
173    // because it has interval less than this wait time
174    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
175
176    Thread.sleep(minAgeToArchive + 1000);
177    LOG.info("Cleaning up MOB files");
178
179    ServerName serverUsed = null;
180    List<RegionInfo> serverRegions = null;
181    for (ServerName sn : admin.getRegionServers()) {
182      serverRegions = admin.getRegions(sn);
183      if (serverRegions != null && serverRegions.size() > 0) {
184        // filtering out non test table regions
185        serverRegions = serverRegions.stream().filter(r -> r.getTable() == table.getName())
186          .collect(Collectors.toList());
187        // if such one is found use this rs
188        if (serverRegions.size() > 0) {
189          serverUsed = sn;
190        }
191        break;
192      }
193    }
194
195    chore = HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore();
196
197    chore.chore();
198
199    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
200    assertEquals(3 - serverRegions.size(), num);
201
202    long scanned = scanTable();
203    assertEquals(20, scanned);
204
205    // creating a MOB file not referenced from the current RS
206    Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf, table.getName(),
207      familyDescriptor, "nonExistentRegion");
208
209    // verifying the new MOBfile is added
210    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
211    assertEquals(4 - serverRegions.size(), num);
212
213    FileSystem fs = FileSystem.get(conf);
214    assertTrue(fs.exists(extraMOBFile));
215
216    LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
217
218    Thread.sleep(minAgeToArchive + 1000);
219    LOG.info("Cleaning up MOB files");
220
221    // running chore again
222    chore.chore();
223
224    // the chore should only archive old MOB files that were referenced from the current RS
225    // the unrelated MOB file is still there
226    num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
227    assertEquals(4 - serverRegions.size(), num);
228
229    assertTrue(fs.exists(extraMOBFile));
230
231    scanned = scanTable();
232    assertEquals(20, scanned);
233  }
234
235  @Test
236  public void testCleaningAndStoreFileReaderCreatedByOtherThreads()
237    throws IOException, InterruptedException {
238    TableName testTable = TableName.valueOf("testCleaningAndStoreFileReaderCreatedByOtherThreads");
239    ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder.newBuilder(fam)
240      .setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build();
241    TableDescriptor tDesc =
242      TableDescriptorBuilder.newBuilder(testTable).setColumnFamily(cfDesc).build();
243    admin.createTable(tDesc);
244    assertTrue(admin.tableExists(testTable));
245
246    // put some data
247    loadData(admin.getConnection().getTable(testTable), 0, 10);
248
249    HRegion region = HTU.getHBaseCluster().getRegions(testTable).get(0);
250    HStore store = region.getStore(fam);
251    Collection<HStoreFile> storeFiles = store.getStorefiles();
252    assertEquals(1, store.getStorefiles().size());
253    final HStoreFile sf = storeFiles.iterator().next();
254    assertNotNull(sf);
255    long mobFileNum = getNumberOfMobFiles(conf, testTable, new String(fam));
256    assertEquals(1, mobFileNum);
257
258    ServerName serverName = null;
259    for (ServerName sn : admin.getRegionServers()) {
260      boolean flag = admin.getRegions(sn).stream().anyMatch(
261        r -> r.getRegionNameAsString().equals(region.getRegionInfo().getRegionNameAsString()));
262      if (flag) {
263        serverName = sn;
264        break;
265      }
266    }
267    assertNotNull(serverName);
268    RSMobFileCleanerChore cleanerChore =
269      HTU.getHBaseCluster().getRegionServer(serverName).getRSMobFileCleanerChore();
270    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
271      boolean readerIsNotNull = false;
272      try {
273        sf.initReader();
274        Thread.sleep(1000 * 10);
275        readerIsNotNull = sf.getReader() != null;
276        sf.closeStoreFile(true);
277      } catch (Exception e) {
278        LOG.error("We occur an exception", e);
279      }
280      return readerIsNotNull;
281    });
282    Thread.sleep(100);
283    // The StoreFileReader object was created by another thread
284    cleanerChore.chore();
285    Boolean readerIsNotNull = future.join();
286    assertTrue(readerIsNotNull);
287    admin.disableTable(testTable);
288    admin.deleteTable(testTable);
289  }
290
291  private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
292    throws IOException {
293    FileSystem fs = FileSystem.get(conf);
294    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
295    FileStatus[] stat = fs.listStatus(dir);
296    for (FileStatus st : stat) {
297      LOG.debug("DDDD MOB Directory content: {} size={}", st.getPath(), st.getLen());
298    }
299    LOG.debug("MOB Directory content total files: {}", stat.length);
300
301    return stat.length;
302  }
303
304  private long scanTable() {
305    try {
306
307      Result result;
308      ResultScanner scanner = table.getScanner(fam);
309      long counter = 0;
310      while ((result = scanner.next()) != null) {
311        assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
312        counter++;
313      }
314      return counter;
315    } catch (Exception e) {
316      e.printStackTrace();
317      LOG.error("MOB file cleaner chore test FAILED");
318      if (HTU != null) {
319        assertTrue(false);
320      } else {
321        System.exit(-1);
322      }
323    }
324    return 0;
325  }
326}