001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.nio.charset.StandardCharsets; 025import java.util.Arrays; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 037import org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor; 038import org.apache.hadoop.hbase.mob.MobConstants; 039import org.apache.hadoop.hbase.mob.MobFileCleanupUtil; 040import org.apache.hadoop.hbase.mob.MobStoreEngine; 041import org.apache.hadoop.hbase.mob.MobUtils; 042import org.apache.hadoop.hbase.testclassification.IntegrationTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.util.ToolRunner; 045import org.junit.After; 046import org.junit.Before; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 053import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 054 055/** 056 * An integration test to detect regressions in HBASE-22749. Test creates MOB-enabled table, and 057 * runs in parallel, the following tasks: loads data, runs MOB compactions, runs MOB cleaning chore. 058 * The failure injections into MOB compaction cycle is implemented via specific sub-class of 059 * DefaultMobStoreCompactor - FaultyMobStoreCompactor. The probability of failure is controlled by 060 * command-line argument 'failprob'. 061 * @see <a href="https://issues.apache.org/jira/browse/HBASE-22749">HBASE-22749</a> 062 * <p> 063 * Sample usage: 064 * 065 * <pre> 066 * hbase org.apache.hadoop.hbase.IntegrationTestMobCompaction -Dservers=10 -Drows=1000000 067 * -Dfailprob=0.2 068 * </pre> 069 */ 070@SuppressWarnings("deprecation") 071 072@Category(IntegrationTests.class) 073public class IntegrationTestMobCompaction extends IntegrationTestBase { 074 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestMobCompaction.class); 075 076 protected static final String REGIONSERVER_COUNT_KEY = "servers"; 077 protected static final String ROWS_COUNT_KEY = "rows"; 078 protected static final String FAILURE_PROB_KEY = "failprob"; 079 080 protected static final int DEFAULT_REGIONSERVER_COUNT = 3; 081 protected static final int DEFAULT_ROWS_COUNT = 5000000; 082 protected static final double DEFAULT_FAILURE_PROB = 0.1; 083 084 protected static int regionServerCount = DEFAULT_REGIONSERVER_COUNT; 085 protected static long rowsToLoad = DEFAULT_ROWS_COUNT; 086 protected static double failureProb = DEFAULT_FAILURE_PROB; 087 088 protected static String famStr = "f1"; 089 protected static byte[] fam = Bytes.toBytes(famStr); 090 protected static byte[] qualifier = Bytes.toBytes("q1"); 091 protected static long mobLen = 10; 092 protected static byte[] mobVal = Bytes 093 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 094 095 private static Configuration conf; 096 private static HTableDescriptor hdt; 097 private static HColumnDescriptor hcd; 098 private static Admin admin; 099 private static Table table = null; 100 101 private static volatile boolean run = true; 102 103 @Override 104 @Before 105 public void setUp() throws Exception { 106 util = getTestingUtil(getConf()); 107 conf = util.getConfiguration(); 108 // Initialize with test-specific configuration values 109 initConf(conf); 110 regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); 111 LOG.info("Initializing cluster with {} region servers.", regionServerCount); 112 util.initializeCluster(regionServerCount); 113 admin = util.getAdmin(); 114 115 createTestTable(); 116 117 LOG.info("Cluster initialized and ready"); 118 } 119 120 private void createTestTable() throws IOException { 121 // Create test table 122 hdt = util.createTableDescriptor("testMobCompactTable"); 123 hcd = new HColumnDescriptor(fam); 124 hcd.setMobEnabled(true); 125 hcd.setMobThreshold(mobLen); 126 hcd.setMaxVersions(1); 127 hdt.addFamily(hcd); 128 table = util.createTable(hdt, null); 129 } 130 131 @After 132 public void tearDown() throws IOException { 133 LOG.info("Cleaning up after test."); 134 if (util.isDistributedCluster()) { 135 deleteTablesIfAny(); 136 // TODO 137 } 138 LOG.info("Restoring cluster."); 139 util.restoreCluster(); 140 LOG.info("Cluster restored."); 141 } 142 143 @Override 144 public void setUpMonkey() throws Exception { 145 // Sorry, no Monkey 146 String msg = "Chaos monkey is not supported"; 147 LOG.warn(msg); 148 throw new IOException(msg); 149 } 150 151 private void deleteTablesIfAny() throws IOException { 152 if (table != null) { 153 util.deleteTableIfAny(table.getName()); 154 } 155 } 156 157 @Override 158 public void setUpCluster() throws Exception { 159 util = getTestingUtil(getConf()); 160 LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); 161 util.initializeCluster(regionServerCount); 162 LOG.debug("Done initializing/checking cluster"); 163 } 164 165 /** Returns status of CLI execution */ 166 @Override 167 public int runTestFromCommandLine() throws Exception { 168 testMobCompaction(); 169 return 0; 170 } 171 172 @Override 173 public TableName getTablename() { 174 // That is only valid when Monkey is CALM (no monkey) 175 return null; 176 } 177 178 @Override 179 protected Set<String> getColumnFamilies() { 180 // That is only valid when Monkey is CALM (no monkey) 181 return null; 182 } 183 184 @Override 185 protected void addOptions() { 186 addOptWithArg(REGIONSERVER_COUNT_KEY, 187 "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); 188 addOptWithArg(ROWS_COUNT_KEY, 189 "Total number of data rows to load. Default: '" + DEFAULT_ROWS_COUNT + "'"); 190 addOptWithArg(FAILURE_PROB_KEY, 191 "Probability of a failure of a region MOB compaction request. Default: '" 192 + DEFAULT_FAILURE_PROB + "'"); 193 } 194 195 @Override 196 protected void processOptions(CommandLine cmd) { 197 super.processOptions(cmd); 198 199 regionServerCount = Integer.parseInt( 200 cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); 201 rowsToLoad = 202 Long.parseLong(cmd.getOptionValue(ROWS_COUNT_KEY, Long.toString(DEFAULT_ROWS_COUNT))); 203 failureProb = Double 204 .parseDouble(cmd.getOptionValue(FAILURE_PROB_KEY, Double.toString(DEFAULT_FAILURE_PROB))); 205 206 LOG.info( 207 MoreObjects.toStringHelper("Parsed Options").add(REGIONSERVER_COUNT_KEY, regionServerCount) 208 .add(ROWS_COUNT_KEY, rowsToLoad).add(FAILURE_PROB_KEY, failureProb).toString()); 209 } 210 211 private static void initConf(Configuration conf) { 212 213 conf.setInt("hfile.format.version", 3); 214 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 215 conf.setInt("hbase.client.retries.number", 100); 216 conf.setInt("hbase.hregion.max.filesize", 200000000); 217 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 218 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 219 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 220 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 221 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 222 conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, FaultyMobStoreCompactor.class.getName()); 223 conf.setBoolean("hbase.table.sanity.checks", false); 224 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 20000); 225 226 } 227 228 static class MajorCompaction implements Runnable { 229 230 @Override 231 public void run() { 232 while (run) { 233 try { 234 admin.majorCompact(hdt.getTableName(), fam); 235 Thread.sleep(120000); 236 } catch (Exception e) { 237 LOG.error("MOB Stress Test FAILED", e); 238 System.exit(-1); 239 } 240 } 241 } 242 } 243 244 static class CleanMobAndArchive implements Runnable { 245 246 @Override 247 public void run() { 248 while (run) { 249 try { 250 LOG.info("MOB cleanup started ..."); 251 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 252 LOG.info("MOB cleanup finished"); 253 254 Thread.sleep(130000); 255 } catch (Exception e) { 256 LOG.warn("Exception in CleanMobAndArchive", e); 257 } 258 } 259 } 260 } 261 262 class WriteData implements Runnable { 263 264 private long rows = -1; 265 266 public WriteData(long rows) { 267 this.rows = rows; 268 } 269 270 @Override 271 public void run() { 272 try { 273 274 // BufferedMutator bm = admin.getConnection().getBufferedMutator(table.getName()); 275 // Put Operation 276 for (int i = 0; i < rows; i++) { 277 Put p = new Put(Bytes.toBytes(i)); 278 p.addColumn(fam, qualifier, mobVal); 279 table.put(p); 280 281 // bm.mutate(p); 282 if (i % 10000 == 0) { 283 LOG.info("LOADED=" + i); 284 try { 285 Thread.sleep(500); 286 } catch (InterruptedException ee) { 287 // Restore interrupt status 288 Thread.currentThread().interrupt(); 289 } 290 } 291 if (i % 100000 == 0) { 292 printStats(i); 293 } 294 } 295 // bm.flush(); 296 admin.flush(table.getName()); 297 run = false; 298 } catch (Exception e) { 299 LOG.error("MOB Stress Test FAILED", e); 300 System.exit(-1); 301 } 302 } 303 } 304 305 @Test 306 public void testMobCompaction() throws InterruptedException, IOException { 307 308 try { 309 310 Thread writeData = new Thread(new WriteData(rowsToLoad)); 311 writeData.start(); 312 313 Thread majorcompact = new Thread(new MajorCompaction()); 314 majorcompact.start(); 315 316 Thread cleaner = new Thread(new CleanMobAndArchive()); 317 cleaner.start(); 318 319 while (run) { 320 Thread.sleep(1000); 321 } 322 323 getNumberOfMobFiles(conf, table.getName(), new String(fam, StandardCharsets.UTF_8)); 324 LOG.info("Waiting for write thread to finish ..."); 325 writeData.join(); 326 // Cleanup again 327 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 328 329 if (util != null) { 330 LOG.info("Archive cleaner started ..."); 331 // Call archive cleaner again 332 util.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 333 LOG.info("Archive cleaner finished"); 334 } 335 336 scanTable(); 337 338 } finally { 339 340 admin.disableTable(hdt.getTableName()); 341 admin.deleteTable(hdt.getTableName()); 342 } 343 LOG.info("MOB Stress Test finished OK"); 344 printStats(rowsToLoad); 345 346 } 347 348 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 349 throws IOException { 350 FileSystem fs = FileSystem.get(conf); 351 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 352 FileStatus[] stat = fs.listStatus(dir); 353 for (FileStatus st : stat) { 354 LOG.debug("MOB Directory content: {}", st.getPath()); 355 } 356 LOG.debug("MOB Directory content total files: {}", stat.length); 357 358 return stat.length; 359 } 360 361 public void printStats(long loaded) { 362 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 363 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 364 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 365 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 366 + FaultyMobStoreCompactor.totalFailures.get()); 367 } 368 369 private void scanTable() { 370 try { 371 372 Result result; 373 ResultScanner scanner = table.getScanner(fam); 374 int counter = 0; 375 while ((result = scanner.next()) != null) { 376 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 377 if (counter % 10000 == 0) { 378 LOG.info("GET=" + counter); 379 } 380 counter++; 381 } 382 assertEquals(rowsToLoad, counter); 383 } catch (Exception e) { 384 e.printStackTrace(); 385 LOG.error("MOB Stress Test FAILED"); 386 if (util != null) { 387 assertTrue(false); 388 } else { 389 System.exit(-1); 390 } 391 } 392 } 393 394 public static void main(String[] args) throws Exception { 395 Configuration conf = HBaseConfiguration.create(); 396 initConf(conf); 397 IntegrationTestingUtility.setUseDistributedCluster(conf); 398 int status = ToolRunner.run(conf, new IntegrationTestMobCompaction(), args); 399 System.exit(status); 400 } 401}