001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.List; 028import java.util.Random; 029import java.util.stream.Collectors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.CompactionState; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableDescriptor; 047import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.RegionSplitter; 052import org.junit.After; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.junit.runner.RunWith; 060import org.junit.runners.Parameterized; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size 066 * to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10 067 * sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes 068 * data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to 069 * number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a 070 * mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10. 071 * Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs 072 * scanner and checks all 3 * 1000 rows. 073 */ 074@RunWith(Parameterized.class) 075@Category(LargeTests.class) 076public class TestMobCompactionWithDefaults { 077 private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class); 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class); 081 082 protected static HBaseTestingUtility HTU; 083 protected static Configuration conf; 084 protected static long minAgeToArchive = 10000; 085 086 protected final static String famStr = "f1"; 087 protected final static byte[] fam = Bytes.toBytes(famStr); 088 protected final static byte[] qualifier = Bytes.toBytes("q1"); 089 protected final static long mobLen = 10; 090 protected final static byte[] mobVal = Bytes 091 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 092 093 @Rule 094 public TestName test = new TestName(); 095 protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor; 096 protected ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor; 097 protected Admin admin; 098 protected TableName table = null; 099 protected int numRegions = 20; 100 protected int rows = 1000; 101 102 protected Boolean useFileBasedSFT; 103 104 public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) { 105 this.useFileBasedSFT = useFileBasedSFT; 106 } 107 108 @Parameterized.Parameters 109 public static Collection<Boolean> data() { 110 Boolean[] data = { false, true }; 111 return Arrays.asList(data); 112 } 113 114 protected void htuStart() throws Exception { 115 HTU = new HBaseTestingUtility(); 116 conf = HTU.getConfiguration(); 117 conf.setInt("hfile.format.version", 3); 118 // Disable automatic MOB compaction 119 conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); 120 // Disable automatic MOB file cleaner chore 121 conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); 122 // Set minimum age to archive to 10 sec 123 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive); 124 // Set compacted file discharger interval to a half minAgeToArchive 125 conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); 126 conf.setBoolean("hbase.regionserver.compaction.enabled", false); 127 if (useFileBasedSFT) { 128 conf.set(StoreFileTrackerFactory.TRACKER_IMPL, 129 "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); 130 } 131 additonalConfigSetup(); 132 HTU.startMiniCluster(); 133 } 134 135 protected void additonalConfigSetup() { 136 } 137 138 @Before 139 public void setUp() throws Exception { 140 htuStart(); 141 tableDescriptor = HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test)); 142 admin = HTU.getAdmin(); 143 familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam); 144 familyDescriptor.setMobEnabled(true); 145 familyDescriptor.setMobThreshold(mobLen); 146 familyDescriptor.setMaxVersions(1); 147 tableDescriptor.setColumnFamily(familyDescriptor); 148 RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit(); 149 byte[][] splitKeys = splitAlgo.split(numRegions); 150 table = HTU.createTable(tableDescriptor, splitKeys).getName(); 151 } 152 153 private void loadData(TableName tableName, int num) { 154 155 Random r = new Random(); 156 LOG.info("Started loading {} rows into {}", num, tableName); 157 try (final Table table = HTU.getConnection().getTable(tableName)) { 158 for (int i = 0; i < num; i++) { 159 byte[] key = new byte[32]; 160 r.nextBytes(key); 161 Put p = new Put(key); 162 p.addColumn(fam, qualifier, mobVal); 163 table.put(p); 164 } 165 admin.flush(tableName); 166 LOG.info("Finished loading {} rows into {}", num, tableName); 167 } catch (Exception e) { 168 LOG.error("MOB file compaction chore test FAILED", e); 169 fail("MOB file compaction chore test FAILED"); 170 } 171 } 172 173 @After 174 public void tearDown() throws Exception { 175 admin.disableTable(tableDescriptor.getTableName()); 176 admin.deleteTable(tableDescriptor.getTableName()); 177 HTU.shutdownMiniCluster(); 178 } 179 180 @Test 181 public void baseTestMobFileCompaction() throws InterruptedException, IOException { 182 LOG.info("MOB compaction " + description() + " started"); 183 loadAndFlushThreeTimes(rows, table, famStr); 184 mobCompact(tableDescriptor, familyDescriptor); 185 assertEquals("Should have 4 MOB files per region due to 3xflush + compaction.", numRegions * 4, 186 getNumberOfMobFiles(table, famStr)); 187 cleanupAndVerifyCounts(table, famStr, 3 * rows); 188 LOG.info("MOB compaction " + description() + " finished OK"); 189 } 190 191 @Test 192 public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException { 193 final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone"); 194 LOG.info("MOB compaction of cloned snapshot, " + description() + " started"); 195 loadAndFlushThreeTimes(rows, table, famStr); 196 LOG.debug("Taking snapshot and cloning table {}", table); 197 admin.snapshot(TestMobUtils.getTableName(test), table); 198 admin.cloneSnapshot(TestMobUtils.getTableName(test), clone); 199 assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions, 200 getNumberOfMobFiles(clone, famStr)); 201 mobCompact(admin.getDescriptor(clone), familyDescriptor); 202 assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + compact", 203 4 * numRegions, getNumberOfMobFiles(clone, famStr)); 204 cleanupAndVerifyCounts(clone, famStr, 3 * rows); 205 LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK"); 206 } 207 208 @Test 209 public void testMobFileCompactionAfterSnapshotCloneAndFlush() 210 throws InterruptedException, IOException { 211 final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test) + "-clone"); 212 LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started"); 213 loadAndFlushThreeTimes(rows, table, famStr); 214 LOG.debug("Taking snapshot and cloning table {}", table); 215 admin.snapshot(TestMobUtils.getTableName(test), table); 216 admin.cloneSnapshot(TestMobUtils.getTableName(test), clone); 217 assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions, 218 getNumberOfMobFiles(clone, famStr)); 219 loadAndFlushThreeTimes(rows, clone, famStr); 220 mobCompact(admin.getDescriptor(clone), familyDescriptor); 221 assertEquals("Should have 7 MOB file per region due to clone + 3xflush + compact", 222 7 * numRegions, getNumberOfMobFiles(clone, famStr)); 223 cleanupAndVerifyCounts(clone, famStr, 6 * rows); 224 LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK"); 225 } 226 227 protected void loadAndFlushThreeTimes(int rows, TableName table, String family) 228 throws IOException { 229 final long start = getNumberOfMobFiles(table, family); 230 // Load and flush data 3 times 231 loadData(table, rows); 232 loadData(table, rows); 233 loadData(table, rows); 234 assertEquals("Should have 3 more mob files per region from flushing.", start + numRegions * 3, 235 getNumberOfMobFiles(table, family)); 236 } 237 238 protected String description() { 239 return "regular mode"; 240 } 241 242 protected void enableCompactions() throws IOException { 243 final List<String> serverList = 244 admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList()); 245 admin.compactionSwitch(true, serverList); 246 } 247 248 protected void disableCompactions() throws IOException { 249 final List<String> serverList = 250 admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList()); 251 admin.compactionSwitch(false, serverList); 252 } 253 254 /** 255 * compact the given table and return once it is done. should presume compactions are disabled 256 * when called. should ensure compactions are disabled before returning. 257 */ 258 protected void mobCompact(TableDescriptor tableDescriptor, 259 ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException { 260 LOG.debug("Major compact MOB table " + tableDescriptor.getTableName()); 261 enableCompactions(); 262 mobCompactImpl(tableDescriptor, familyDescriptor); 263 waitUntilCompactionIsComplete(tableDescriptor.getTableName()); 264 disableCompactions(); 265 } 266 267 /** 268 * Call the API for compaction specific to the test set. should not wait for compactions to 269 * finish. may assume compactions are enabled when called. 270 */ 271 protected void mobCompactImpl(TableDescriptor tableDescriptor, 272 ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException { 273 admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName()); 274 } 275 276 protected void waitUntilCompactionIsComplete(TableName table) 277 throws IOException, InterruptedException { 278 CompactionState state = admin.getCompactionState(table); 279 while (state != CompactionState.NONE) { 280 LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state); 281 Thread.sleep(100); 282 state = admin.getCompactionState(table); 283 } 284 LOG.debug("done waiting for compaction on {}", table); 285 } 286 287 protected void cleanupAndVerifyCounts(TableName table, String family, int rows) 288 throws InterruptedException, IOException { 289 // We have guarantee, that compacted file discharger will run during this pause 290 // because it has interval less than this wait time 291 LOG.info("Waiting for {}ms", minAgeToArchive + 1000); 292 293 Thread.sleep(minAgeToArchive + 1000); 294 LOG.info("Cleaning up MOB files"); 295 296 // run cleaner chore on each RS 297 for (ServerName sn : admin.getRegionServers()) { 298 HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore(); 299 } 300 301 assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions, 302 getNumberOfMobFiles(table, family)); 303 304 LOG.debug("checking count of rows"); 305 long scanned = scanTable(table); 306 assertEquals("Got the wrong number of rows in table " + table + " cf " + family, rows, scanned); 307 308 } 309 310 protected long getNumberOfMobFiles(TableName tableName, String family) throws IOException { 311 FileSystem fs = FileSystem.get(conf); 312 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 313 FileStatus[] stat = fs.listStatus(dir); 314 for (FileStatus st : stat) { 315 LOG.debug("MOB Directory content: {}", st.getPath()); 316 } 317 LOG.debug("MOB Directory content total files: {}", stat.length); 318 319 return stat.length; 320 } 321 322 protected long scanTable(TableName tableName) { 323 try (final Table table = HTU.getConnection().getTable(tableName); 324 final ResultScanner scanner = table.getScanner(fam)) { 325 Result result; 326 long counter = 0; 327 while ((result = scanner.next()) != null) { 328 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 329 counter++; 330 } 331 return counter; 332 } catch (Exception e) { 333 LOG.error("MOB file compaction test FAILED", e); 334 if (HTU != null) { 335 fail(e.getMessage()); 336 } else { 337 System.exit(-1); 338 } 339 } 340 return 0; 341 } 342}