001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.nio.charset.StandardCharsets; 025import java.util.Arrays; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.ResultScanner; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 040import org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor; 041import org.apache.hadoop.hbase.mob.MobConstants; 042import org.apache.hadoop.hbase.mob.MobFileCleanupUtil; 043import org.apache.hadoop.hbase.mob.MobStoreEngine; 044import org.apache.hadoop.hbase.mob.MobUtils; 045import org.apache.hadoop.hbase.testclassification.IntegrationTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.util.ToolRunner; 048import org.junit.After; 049import org.junit.Before; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 056import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 057 058/** 059 * An integration test to detect regressions in HBASE-22749. Test creates MOB-enabled table, and 060 * runs in parallel, the following tasks: loads data, runs MOB compactions, runs MOB cleaning chore. 061 * The failure injections into MOB compaction cycle is implemented via specific sub-class of 062 * DefaultMobStoreCompactor - FaultyMobStoreCompactor. The probability of failure is controlled by 063 * command-line argument 'failprob'. 064 * @see <a href="https://issues.apache.org/jira/browse/HBASE-22749">HBASE-22749</a> 065 * <p> 066 * Sample usage: 067 * 068 * <pre> 069 * hbase org.apache.hadoop.hbase.IntegrationTestMobCompaction -Dservers=10 -Drows=1000000 070 * -Dfailprob=0.2 071 * </pre> 072 */ 073@SuppressWarnings("deprecation") 074 075@Category(IntegrationTests.class) 076public class IntegrationTestMobCompaction extends IntegrationTestBase { 077 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestMobCompaction.class); 078 079 protected static final String REGIONSERVER_COUNT_KEY = "servers"; 080 protected static final String ROWS_COUNT_KEY = "rows"; 081 protected static final String FAILURE_PROB_KEY = "failprob"; 082 083 protected static final int DEFAULT_REGIONSERVER_COUNT = 3; 084 protected static final int DEFAULT_ROWS_COUNT = 5000000; 085 protected static final double DEFAULT_FAILURE_PROB = 0.1; 086 087 protected static int regionServerCount = DEFAULT_REGIONSERVER_COUNT; 088 protected static long rowsToLoad = DEFAULT_ROWS_COUNT; 089 protected static double failureProb = DEFAULT_FAILURE_PROB; 090 091 protected static String famStr = "f1"; 092 protected static byte[] fam = Bytes.toBytes(famStr); 093 protected static byte[] qualifier = Bytes.toBytes("q1"); 094 protected static long mobLen = 10; 095 protected static byte[] mobVal = Bytes 096 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 097 098 private static Configuration conf; 099 private static TableDescriptor tableDescriptor; 100 private static ColumnFamilyDescriptor familyDescriptor; 101 private static Admin admin; 102 private static Table table = null; 103 104 private static volatile boolean run = true; 105 106 @Override 107 @Before 108 public void setUp() throws Exception { 109 util = getTestingUtil(getConf()); 110 conf = util.getConfiguration(); 111 // Initialize with test-specific configuration values 112 initConf(conf); 113 regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); 114 LOG.info("Initializing cluster with {} region servers.", regionServerCount); 115 util.initializeCluster(regionServerCount); 116 admin = util.getAdmin(); 117 118 createTestTable(); 119 120 LOG.info("Cluster initialized and ready"); 121 } 122 123 private void createTestTable() throws IOException { 124 // Create test table 125 familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) 126 .setMobThreshold(mobLen).setMaxVersions(1).build(); 127 tableDescriptor = util.createModifyableTableDescriptor("testMobCompactTable") 128 .setColumnFamily(familyDescriptor).build(); 129 table = util.createTable(tableDescriptor, null); 130 } 131 132 @After 133 public void tearDown() throws IOException { 134 LOG.info("Cleaning up after test."); 135 if (util.isDistributedCluster()) { 136 deleteTablesIfAny(); 137 // TODO 138 } 139 LOG.info("Restoring cluster."); 140 util.restoreCluster(); 141 LOG.info("Cluster restored."); 142 } 143 144 @Override 145 public void setUpMonkey() throws Exception { 146 // Sorry, no Monkey 147 String msg = "Chaos monkey is not supported"; 148 LOG.warn(msg); 149 throw new IOException(msg); 150 } 151 152 private void deleteTablesIfAny() throws IOException { 153 if (table != null) { 154 util.deleteTableIfAny(table.getName()); 155 } 156 } 157 158 @Override 159 public void setUpCluster() throws Exception { 160 util = getTestingUtil(getConf()); 161 LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); 162 util.initializeCluster(regionServerCount); 163 LOG.debug("Done initializing/checking cluster"); 164 } 165 166 /** Returns status of CLI execution */ 167 @Override 168 public int runTestFromCommandLine() throws Exception { 169 testMobCompaction(); 170 return 0; 171 } 172 173 @Override 174 public TableName getTablename() { 175 // That is only valid when Monkey is CALM (no monkey) 176 return null; 177 } 178 179 @Override 180 protected Set<String> getColumnFamilies() { 181 // That is only valid when Monkey is CALM (no monkey) 182 return null; 183 } 184 185 @Override 186 protected void addOptions() { 187 addOptWithArg(REGIONSERVER_COUNT_KEY, 188 "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); 189 addOptWithArg(ROWS_COUNT_KEY, 190 "Total number of data rows to load. Default: '" + DEFAULT_ROWS_COUNT + "'"); 191 addOptWithArg(FAILURE_PROB_KEY, 192 "Probability of a failure of a region MOB compaction request. Default: '" 193 + DEFAULT_FAILURE_PROB + "'"); 194 } 195 196 @Override 197 protected void processOptions(CommandLine cmd) { 198 super.processOptions(cmd); 199 200 regionServerCount = Integer.parseInt( 201 cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); 202 rowsToLoad = 203 Long.parseLong(cmd.getOptionValue(ROWS_COUNT_KEY, Long.toString(DEFAULT_ROWS_COUNT))); 204 failureProb = Double 205 .parseDouble(cmd.getOptionValue(FAILURE_PROB_KEY, Double.toString(DEFAULT_FAILURE_PROB))); 206 207 LOG.info( 208 MoreObjects.toStringHelper("Parsed Options").add(REGIONSERVER_COUNT_KEY, regionServerCount) 209 .add(ROWS_COUNT_KEY, rowsToLoad).add(FAILURE_PROB_KEY, failureProb).toString()); 210 } 211 212 private static void initConf(Configuration conf) { 213 214 conf.setInt("hfile.format.version", 3); 215 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 216 conf.setInt("hbase.client.retries.number", 100); 217 conf.setInt("hbase.hregion.max.filesize", 200000000); 218 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 219 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 220 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 221 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 222 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 223 conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, FaultyMobStoreCompactor.class.getName()); 224 conf.setBoolean("hbase.table.sanity.checks", false); 225 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 20000); 226 227 } 228 229 static class MajorCompaction implements Runnable { 230 231 @Override 232 public void run() { 233 while (run) { 234 try { 235 admin.majorCompact(tableDescriptor.getTableName(), fam); 236 Thread.sleep(120000); 237 } catch (Exception e) { 238 LOG.error("MOB Stress Test FAILED", e); 239 System.exit(-1); 240 } 241 } 242 } 243 } 244 245 static class CleanMobAndArchive implements Runnable { 246 247 @Override 248 public void run() { 249 while (run) { 250 try { 251 LOG.info("MOB cleanup started ..."); 252 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 253 LOG.info("MOB cleanup finished"); 254 255 Thread.sleep(130000); 256 } catch (Exception e) { 257 LOG.warn("Exception in CleanMobAndArchive", e); 258 } 259 } 260 } 261 } 262 263 class WriteData implements Runnable { 264 265 private long rows = -1; 266 267 public WriteData(long rows) { 268 this.rows = rows; 269 } 270 271 @Override 272 public void run() { 273 try { 274 275 // BufferedMutator bm = admin.getConnection().getBufferedMutator(table.getName()); 276 // Put Operation 277 for (int i = 0; i < rows; i++) { 278 Put p = new Put(Bytes.toBytes(i)); 279 p.addColumn(fam, qualifier, mobVal); 280 table.put(p); 281 282 // bm.mutate(p); 283 if (i % 10000 == 0) { 284 LOG.info("LOADED=" + i); 285 try { 286 Thread.sleep(500); 287 } catch (InterruptedException ee) { 288 // Restore interrupt status 289 Thread.currentThread().interrupt(); 290 } 291 } 292 if (i % 100000 == 0) { 293 printStats(i); 294 } 295 } 296 // bm.flush(); 297 admin.flush(table.getName()); 298 run = false; 299 } catch (Exception e) { 300 LOG.error("MOB Stress Test FAILED", e); 301 System.exit(-1); 302 } 303 } 304 } 305 306 @Test 307 public void testMobCompaction() throws InterruptedException, IOException { 308 309 try { 310 311 Thread writeData = new Thread(new WriteData(rowsToLoad)); 312 writeData.start(); 313 314 Thread majorcompact = new Thread(new MajorCompaction()); 315 majorcompact.start(); 316 317 Thread cleaner = new Thread(new CleanMobAndArchive()); 318 cleaner.start(); 319 320 while (run) { 321 Thread.sleep(1000); 322 } 323 324 getNumberOfMobFiles(conf, table.getName(), new String(fam, StandardCharsets.UTF_8)); 325 LOG.info("Waiting for write thread to finish ..."); 326 writeData.join(); 327 // Cleanup again 328 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 329 330 if (util != null) { 331 LOG.info("Archive cleaner started ..."); 332 // Call archive cleaner again 333 util.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 334 LOG.info("Archive cleaner finished"); 335 } 336 337 scanTable(); 338 339 } finally { 340 341 admin.disableTable(tableDescriptor.getTableName()); 342 admin.deleteTable(tableDescriptor.getTableName()); 343 } 344 LOG.info("MOB Stress Test finished OK"); 345 printStats(rowsToLoad); 346 347 } 348 349 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 350 throws IOException { 351 FileSystem fs = FileSystem.get(conf); 352 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 353 FileStatus[] stat = fs.listStatus(dir); 354 for (FileStatus st : stat) { 355 LOG.debug("MOB Directory content: {}", st.getPath()); 356 } 357 LOG.debug("MOB Directory content total files: {}", stat.length); 358 359 return stat.length; 360 } 361 362 public void printStats(long loaded) { 363 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 364 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 365 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 366 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 367 + FaultyMobStoreCompactor.totalFailures.get()); 368 } 369 370 private void scanTable() { 371 try { 372 373 Result result; 374 ResultScanner scanner = table.getScanner(fam); 375 int counter = 0; 376 while ((result = scanner.next()) != null) { 377 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 378 if (counter % 10000 == 0) { 379 LOG.info("GET=" + counter); 380 } 381 counter++; 382 } 383 assertEquals(rowsToLoad, counter); 384 } catch (Exception e) { 385 e.printStackTrace(); 386 LOG.error("MOB Stress Test FAILED"); 387 if (util != null) { 388 assertTrue(false); 389 } else { 390 System.exit(-1); 391 } 392 } 393 } 394 395 public static void main(String[] args) throws Exception { 396 Configuration conf = HBaseConfiguration.create(); 397 initConf(conf); 398 IntegrationTestingUtility.setUseDistributedCluster(conf); 399 int status = ToolRunner.run(conf, new IntegrationTestMobCompaction(), args); 400 System.exit(status); 401 } 402}