001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Arrays; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HColumnDescriptor; 031import org.apache.hadoop.hbase.HTableDescriptor; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Reproduction for MOB data loss 1. Settings: Region Size 200 MB, Flush threshold 800 KB. 2. Insert 047 * 10 Million records 3. MOB Compaction and Archiver a) Trigger MOB Compaction (every 2 minutes) b) 048 * Trigger major compaction (every 2 minutes) c) Trigger archive cleaner (every 3 minutes) 4. 049 * Validate MOB data after complete data load. This class is used by MobStressTool only. This is not 050 * a unit test 051 */ 052@SuppressWarnings("deprecation") 053public class MobStressToolRunner { 054 private static final Logger LOG = LoggerFactory.getLogger(MobStressToolRunner.class); 055 056 private HBaseTestingUtility HTU; 057 058 private final static String famStr = "f1"; 059 private final static byte[] fam = Bytes.toBytes(famStr); 060 private final static byte[] qualifier = Bytes.toBytes("q1"); 061 private final static long mobLen = 10; 062 private final static byte[] mobVal = Bytes 063 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 064 065 private Configuration conf; 066 private HTableDescriptor hdt; 067 private HColumnDescriptor hcd; 068 private Admin admin; 069 private long count = 500000; 070 private double failureProb = 0.1; 071 private Table table = null; 072 073 private static volatile boolean run = true; 074 075 public MobStressToolRunner() { 076 077 } 078 079 public void init(Configuration conf, long numRows) throws IOException { 080 this.conf = conf; 081 this.count = numRows; 082 initConf(); 083 printConf(); 084 hdt = new HTableDescriptor(TableName.valueOf("testMobCompactTable")); 085 Connection conn = ConnectionFactory.createConnection(this.conf); 086 this.admin = conn.getAdmin(); 087 this.hcd = new HColumnDescriptor(fam); 088 this.hcd.setMobEnabled(true); 089 this.hcd.setMobThreshold(mobLen); 090 this.hcd.setMaxVersions(1); 091 this.hdt.addFamily(hcd); 092 if (admin.tableExists(hdt.getTableName())) { 093 admin.disableTable(hdt.getTableName()); 094 admin.deleteTable(hdt.getTableName()); 095 } 096 admin.createTable(hdt); 097 table = conn.getTable(hdt.getTableName()); 098 } 099 100 private void printConf() { 101 LOG.info("Please ensure the following HBase configuration is set:"); 102 LOG.info("hfile.format.version=3"); 103 LOG.info("hbase.master.hfilecleaner.ttl=0"); 104 LOG.info("hbase.hregion.max.filesize=200000000"); 105 LOG.info("hbase.client.retries.number=100"); 106 LOG.info("hbase.hregion.memstore.flush.size=800000"); 107 LOG.info("hbase.hstore.blockingStoreFiles=150"); 108 LOG.info("hbase.hstore.compaction.throughput.lower.bound=50000000"); 109 LOG.info("hbase.hstore.compaction.throughput.higher.bound=100000000"); 110 LOG.info("hbase.master.mob.cleaner.period=0"); 111 LOG.info("hbase.mob.default.compactor=org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor"); 112 LOG.warn("hbase.mob.compaction.fault.probability=x, where x is between 0. and 1."); 113 114 } 115 116 private void initConf() { 117 118 conf.setInt("hfile.format.version", 3); 119 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 120 conf.setInt("hbase.client.retries.number", 100); 121 conf.setInt("hbase.hregion.max.filesize", 200000000); 122 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 123 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 124 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 125 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 126 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 127 // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 128 // FaultyMobStoreCompactor.class.getName()); 129 conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); 130 conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); 131 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 120000); 132 conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); 133 conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000); 134 135 } 136 137 class MajorCompaction implements Runnable { 138 139 @Override 140 public void run() { 141 while (run) { 142 try { 143 admin.majorCompact(hdt.getTableName(), fam); 144 Thread.sleep(120000); 145 } catch (Exception e) { 146 LOG.error("MOB Stress Test FAILED", e); 147 System.exit(-1); 148 } 149 } 150 } 151 } 152 153 class CleanMobAndArchive implements Runnable { 154 155 @Override 156 public void run() { 157 while (run) { 158 try { 159 LOG.info("MOB cleanup started ..."); 160 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 161 LOG.info("MOB cleanup finished"); 162 163 Thread.sleep(130000); 164 } catch (Exception e) { 165 LOG.error("CleanMobAndArchive", e); 166 } 167 } 168 } 169 } 170 171 class WriteData implements Runnable { 172 173 private long rows = -1; 174 175 public WriteData(long rows) { 176 this.rows = rows; 177 } 178 179 @Override 180 public void run() { 181 try { 182 183 // Put Operation 184 for (int i = 0; i < rows; i++) { 185 byte[] key = Bytes.toBytes(i); 186 Put p = new Put(key); 187 p.addColumn(fam, qualifier, Bytes.add(key, mobVal)); 188 table.put(p); 189 if (i % 10000 == 0) { 190 LOG.info("LOADED=" + i); 191 try { 192 Thread.sleep(500); 193 } catch (InterruptedException ee) { 194 } 195 } 196 if (i % 100000 == 0) { 197 printStats(i); 198 } 199 } 200 admin.flush(table.getName()); 201 run = false; 202 } catch (Exception e) { 203 LOG.error("MOB Stress Test FAILED", e); 204 System.exit(-1); 205 } 206 } 207 } 208 209 public void runStressTest() throws InterruptedException, IOException { 210 211 try { 212 213 Thread writeData = new Thread(new WriteData(count)); 214 writeData.start(); 215 216 Thread majorcompact = new Thread(new MajorCompaction()); 217 majorcompact.start(); 218 219 Thread cleaner = new Thread(new CleanMobAndArchive()); 220 cleaner.start(); 221 222 while (run) { 223 Thread.sleep(1000); 224 } 225 226 getNumberOfMobFiles(conf, table.getName(), new String(fam)); 227 LOG.info("Waiting for write thread to finish ..."); 228 writeData.join(); 229 // Cleanup again 230 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 231 getNumberOfMobFiles(conf, table.getName(), new String(fam)); 232 233 if (HTU != null) { 234 LOG.info("Archive cleaner started ..."); 235 // Call archive cleaner again 236 HTU.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 237 LOG.info("Archive cleaner finished"); 238 } 239 240 scanTable(); 241 242 } finally { 243 244 admin.disableTable(hdt.getTableName()); 245 admin.deleteTable(hdt.getTableName()); 246 } 247 LOG.info("MOB Stress Test finished OK"); 248 printStats(count); 249 250 } 251 252 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 253 throws IOException { 254 FileSystem fs = FileSystem.get(conf); 255 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 256 FileStatus[] stat = fs.listStatus(dir); 257 long size = 0; 258 for (FileStatus st : stat) { 259 LOG.debug("MOB Directory content: {} len={}", st.getPath(), st.getLen()); 260 size += st.getLen(); 261 } 262 LOG.debug("MOB Directory content total files: {}, total size={}", stat.length, size); 263 264 return stat.length; 265 } 266 267 public void printStats(long loaded) { 268 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 269 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 270 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 271 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 272 + FaultyMobStoreCompactor.totalFailures.get()); 273 } 274 275 private void scanTable() { 276 try { 277 278 Result result; 279 ResultScanner scanner = table.getScanner(fam); 280 int counter = 0; 281 while ((result = scanner.next()) != null) { 282 byte[] key = result.getRow(); 283 assertTrue(Arrays.equals(result.getValue(fam, qualifier), Bytes.add(key, mobVal))); 284 if (counter % 10000 == 0) { 285 LOG.info("GET=" + counter + " key=" + Bytes.toInt(key)); 286 } 287 counter++; 288 } 289 290 assertEquals(count, counter); 291 } catch (Exception e) { 292 e.printStackTrace(); 293 LOG.error("MOB Stress Test FAILED"); 294 if (HTU != null) { 295 assertTrue(false); 296 } else { 297 System.exit(-1); 298 } 299 } 300 } 301}