001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Arrays; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Reproduction for MOB data loss 1. Settings: Region Size 200 MB, Flush threshold 800 KB. 2. Insert 049 * 10 Million records 3. MOB Compaction and Archiver a) Trigger MOB Compaction (every 2 minutes) b) 050 * Trigger major compaction (every 2 minutes) c) Trigger archive cleaner (every 3 minutes) 4. 051 * Validate MOB data after complete data load. This class is used by MobStressTool only. This is not 052 * a unit test 053 */ 054public class MobStressToolRunner { 055 private static final Logger LOG = LoggerFactory.getLogger(MobStressToolRunner.class); 056 057 private HBaseTestingUtil HTU; 058 059 private final static String famStr = "f1"; 060 private final static byte[] fam = Bytes.toBytes(famStr); 061 private final static byte[] qualifier = Bytes.toBytes("q1"); 062 private final static long mobLen = 10; 063 private final static byte[] mobVal = Bytes 064 .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); 065 066 private Configuration conf; 067 private TableDescriptor tableDescriptor; 068 private ColumnFamilyDescriptor familyDescriptor; 069 private Admin admin; 070 private long count = 500000; 071 private double failureProb = 0.1; 072 private Table table = null; 073 074 private static volatile boolean run = true; 075 076 public MobStressToolRunner() { 077 } 078 079 public void init(Configuration conf, long numRows) throws IOException { 080 this.conf = conf; 081 this.count = numRows; 082 initConf(); 083 printConf(); 084 Connection conn = ConnectionFactory.createConnection(this.conf); 085 this.admin = conn.getAdmin(); 086 this.familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true) 087 .setMobThreshold(mobLen).setMaxVersions(1).build(); 088 this.tableDescriptor = 089 TableDescriptorBuilder.newBuilder(TableName.valueOf("testMobCompactTable")) 090 .setColumnFamily(familyDescriptor).build(); 091 if (admin.tableExists(tableDescriptor.getTableName())) { 092 admin.disableTable(tableDescriptor.getTableName()); 093 admin.deleteTable(tableDescriptor.getTableName()); 094 } 095 admin.createTable(tableDescriptor); 096 table = conn.getTable(tableDescriptor.getTableName()); 097 } 098 099 private void printConf() { 100 LOG.info("Please ensure the following HBase configuration is set:"); 101 LOG.info("hfile.format.version=3"); 102 LOG.info("hbase.master.hfilecleaner.ttl=0"); 103 LOG.info("hbase.hregion.max.filesize=200000000"); 104 LOG.info("hbase.client.retries.number=100"); 105 LOG.info("hbase.hregion.memstore.flush.size=800000"); 106 LOG.info("hbase.hstore.blockingStoreFiles=150"); 107 LOG.info("hbase.hstore.compaction.throughput.lower.bound=50000000"); 108 LOG.info("hbase.hstore.compaction.throughput.higher.bound=100000000"); 109 LOG.info("hbase.master.mob.cleaner.period=0"); 110 LOG.info("hbase.mob.default.compactor=org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor"); 111 LOG.warn("hbase.mob.compaction.fault.probability=x, where x is between 0. and 1."); 112 113 } 114 115 private void initConf() { 116 117 conf.setInt("hfile.format.version", 3); 118 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 119 conf.setInt("hbase.client.retries.number", 100); 120 conf.setInt("hbase.hregion.max.filesize", 200000000); 121 conf.setInt("hbase.hregion.memstore.flush.size", 800000); 122 conf.setInt("hbase.hstore.blockingStoreFiles", 150); 123 conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800); 124 conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800); 125 conf.setDouble("hbase.mob.compaction.fault.probability", failureProb); 126 // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY, 127 // FaultyMobStoreCompactor.class.getName()); 128 conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0); 129 conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0); 130 conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 120000); 131 conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); 132 conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000); 133 134 } 135 136 class MajorCompaction implements Runnable { 137 138 @Override 139 public void run() { 140 while (run) { 141 try { 142 admin.majorCompact(tableDescriptor.getTableName(), fam); 143 Thread.sleep(120000); 144 } catch (Exception e) { 145 LOG.error("MOB Stress Test FAILED", e); 146 System.exit(-1); 147 } 148 } 149 } 150 } 151 152 class CleanMobAndArchive implements Runnable { 153 154 @Override 155 public void run() { 156 while (run) { 157 try { 158 LOG.info("MOB cleanup started ..."); 159 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 160 LOG.info("MOB cleanup finished"); 161 162 Thread.sleep(130000); 163 } catch (Exception e) { 164 LOG.error("CleanMobAndArchive", e); 165 } 166 } 167 } 168 } 169 170 class WriteData implements Runnable { 171 172 private long rows = -1; 173 174 public WriteData(long rows) { 175 this.rows = rows; 176 } 177 178 @Override 179 public void run() { 180 try { 181 182 // Put Operation 183 for (int i = 0; i < rows; i++) { 184 byte[] key = Bytes.toBytes(i); 185 Put p = new Put(key); 186 p.addColumn(fam, qualifier, Bytes.add(key, mobVal)); 187 table.put(p); 188 if (i % 10000 == 0) { 189 LOG.info("LOADED=" + i); 190 try { 191 Thread.sleep(500); 192 } catch (InterruptedException ee) { 193 } 194 } 195 if (i % 100000 == 0) { 196 printStats(i); 197 } 198 } 199 admin.flush(table.getName()); 200 run = false; 201 } catch (Exception e) { 202 LOG.error("MOB Stress Test FAILED", e); 203 System.exit(-1); 204 } 205 } 206 } 207 208 public void runStressTest() throws InterruptedException, IOException { 209 210 try { 211 212 Thread writeData = new Thread(new WriteData(count)); 213 writeData.start(); 214 215 Thread majorcompact = new Thread(new MajorCompaction()); 216 majorcompact.start(); 217 218 Thread cleaner = new Thread(new CleanMobAndArchive()); 219 cleaner.start(); 220 221 while (run) { 222 Thread.sleep(1000); 223 } 224 225 getNumberOfMobFiles(conf, table.getName(), new String(fam)); 226 LOG.info("Waiting for write thread to finish ..."); 227 writeData.join(); 228 // Cleanup again 229 MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin); 230 getNumberOfMobFiles(conf, table.getName(), new String(fam)); 231 232 if (HTU != null) { 233 LOG.info("Archive cleaner started ..."); 234 // Call archive cleaner again 235 HTU.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 236 LOG.info("Archive cleaner finished"); 237 } 238 239 scanTable(); 240 241 } finally { 242 243 admin.disableTable(tableDescriptor.getTableName()); 244 admin.deleteTable(tableDescriptor.getTableName()); 245 } 246 LOG.info("MOB Stress Test finished OK"); 247 printStats(count); 248 249 } 250 251 private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) 252 throws IOException { 253 FileSystem fs = FileSystem.get(conf); 254 Path dir = MobUtils.getMobFamilyPath(conf, tableName, family); 255 FileStatus[] stat = fs.listStatus(dir); 256 long size = 0; 257 for (FileStatus st : stat) { 258 LOG.debug("MOB Directory content: {} len={}", st.getPath(), st.getLen()); 259 size += st.getLen(); 260 } 261 LOG.debug("MOB Directory content total files: {}, total size={}", stat.length, size); 262 263 return stat.length; 264 } 265 266 public void printStats(long loaded) { 267 LOG.info("MOB Stress Test: loaded=" + loaded + " compactions=" 268 + FaultyMobStoreCompactor.totalCompactions.get() + " major=" 269 + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob=" 270 + FaultyMobStoreCompactor.mobCounter.get() + " injected failures=" 271 + FaultyMobStoreCompactor.totalFailures.get()); 272 } 273 274 private void scanTable() { 275 try { 276 277 Result result; 278 ResultScanner scanner = table.getScanner(fam); 279 int counter = 0; 280 while ((result = scanner.next()) != null) { 281 byte[] key = result.getRow(); 282 assertTrue(Arrays.equals(result.getValue(fam, qualifier), Bytes.add(key, mobVal))); 283 if (counter % 10000 == 0) { 284 LOG.info("GET=" + counter + " key=" + Bytes.toInt(key)); 285 } 286 counter++; 287 } 288 289 assertEquals(count, counter); 290 } catch (Exception e) { 291 e.printStackTrace(); 292 LOG.error("MOB Stress Test FAILED"); 293 if (HTU != null) { 294 assertTrue(false); 295 } else { 296 System.exit(-1); 297 } 298 } 299 } 300}