001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Arrays;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HColumnDescriptor;
031import org.apache.hadoop.hbase.HTableDescriptor;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Reproduction for MOB data loss 1. Settings: Region Size 200 MB, Flush threshold 800 KB. 2. Insert
047 * 10 Million records 3. MOB Compaction and Archiver a) Trigger MOB Compaction (every 2 minutes) b)
048 * Trigger major compaction (every 2 minutes) c) Trigger archive cleaner (every 3 minutes) 4.
049 * Validate MOB data after complete data load. This class is used by MobStressTool only. This is not
050 * a unit test
051 */
052@SuppressWarnings("deprecation")
053public class MobStressToolRunner {
054  private static final Logger LOG = LoggerFactory.getLogger(MobStressToolRunner.class);
055
056  private HBaseTestingUtility HTU;
057
058  private final static String famStr = "f1";
059  private final static byte[] fam = Bytes.toBytes(famStr);
060  private final static byte[] qualifier = Bytes.toBytes("q1");
061  private final static long mobLen = 10;
062  private final static byte[] mobVal = Bytes
063    .toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
064
065  private Configuration conf;
066  private HTableDescriptor hdt;
067  private HColumnDescriptor hcd;
068  private Admin admin;
069  private long count = 500000;
070  private double failureProb = 0.1;
071  private Table table = null;
072
073  private static volatile boolean run = true;
074
075  public MobStressToolRunner() {
076
077  }
078
079  public void init(Configuration conf, long numRows) throws IOException {
080    this.conf = conf;
081    this.count = numRows;
082    initConf();
083    printConf();
084    hdt = new HTableDescriptor(TableName.valueOf("testMobCompactTable"));
085    Connection conn = ConnectionFactory.createConnection(this.conf);
086    this.admin = conn.getAdmin();
087    this.hcd = new HColumnDescriptor(fam);
088    this.hcd.setMobEnabled(true);
089    this.hcd.setMobThreshold(mobLen);
090    this.hcd.setMaxVersions(1);
091    this.hdt.addFamily(hcd);
092    if (admin.tableExists(hdt.getTableName())) {
093      admin.disableTable(hdt.getTableName());
094      admin.deleteTable(hdt.getTableName());
095    }
096    admin.createTable(hdt);
097    table = conn.getTable(hdt.getTableName());
098  }
099
100  private void printConf() {
101    LOG.info("Please ensure the following HBase configuration is set:");
102    LOG.info("hfile.format.version=3");
103    LOG.info("hbase.master.hfilecleaner.ttl=0");
104    LOG.info("hbase.hregion.max.filesize=200000000");
105    LOG.info("hbase.client.retries.number=100");
106    LOG.info("hbase.hregion.memstore.flush.size=800000");
107    LOG.info("hbase.hstore.blockingStoreFiles=150");
108    LOG.info("hbase.hstore.compaction.throughput.lower.bound=50000000");
109    LOG.info("hbase.hstore.compaction.throughput.higher.bound=100000000");
110    LOG.info("hbase.master.mob.cleaner.period=0");
111    LOG.info("hbase.mob.default.compactor=org.apache.hadoop.hbase.mob.FaultyMobStoreCompactor");
112    LOG.warn("hbase.mob.compaction.fault.probability=x, where x is between 0. and 1.");
113
114  }
115
116  private void initConf() {
117
118    conf.setInt("hfile.format.version", 3);
119    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
120    conf.setInt("hbase.client.retries.number", 100);
121    conf.setInt("hbase.hregion.max.filesize", 200000000);
122    conf.setInt("hbase.hregion.memstore.flush.size", 800000);
123    conf.setInt("hbase.hstore.blockingStoreFiles", 150);
124    conf.setInt("hbase.hstore.compaction.throughput.lower.bound", 52428800);
125    conf.setInt("hbase.hstore.compaction.throughput.higher.bound", 2 * 52428800);
126    conf.setDouble("hbase.mob.compaction.fault.probability", failureProb);
127    // conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
128    // FaultyMobStoreCompactor.class.getName());
129    conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
130    conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
131    conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, 120000);
132    conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
133    conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
134
135  }
136
137  class MajorCompaction implements Runnable {
138
139    @Override
140    public void run() {
141      while (run) {
142        try {
143          admin.majorCompact(hdt.getTableName(), fam);
144          Thread.sleep(120000);
145        } catch (Exception e) {
146          LOG.error("MOB Stress Test FAILED", e);
147          System.exit(-1);
148        }
149      }
150    }
151  }
152
153  class CleanMobAndArchive implements Runnable {
154
155    @Override
156    public void run() {
157      while (run) {
158        try {
159          LOG.info("MOB cleanup started ...");
160          MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
161          LOG.info("MOB cleanup finished");
162
163          Thread.sleep(130000);
164        } catch (Exception e) {
165          LOG.error("CleanMobAndArchive", e);
166        }
167      }
168    }
169  }
170
171  class WriteData implements Runnable {
172
173    private long rows = -1;
174
175    public WriteData(long rows) {
176      this.rows = rows;
177    }
178
179    @Override
180    public void run() {
181      try {
182
183        // Put Operation
184        for (int i = 0; i < rows; i++) {
185          byte[] key = Bytes.toBytes(i);
186          Put p = new Put(key);
187          p.addColumn(fam, qualifier, Bytes.add(key, mobVal));
188          table.put(p);
189          if (i % 10000 == 0) {
190            LOG.info("LOADED=" + i);
191            try {
192              Thread.sleep(500);
193            } catch (InterruptedException ee) {
194            }
195          }
196          if (i % 100000 == 0) {
197            printStats(i);
198          }
199        }
200        admin.flush(table.getName());
201        run = false;
202      } catch (Exception e) {
203        LOG.error("MOB Stress Test FAILED", e);
204        System.exit(-1);
205      }
206    }
207  }
208
209  public void runStressTest() throws InterruptedException, IOException {
210
211    try {
212
213      Thread writeData = new Thread(new WriteData(count));
214      writeData.start();
215
216      Thread majorcompact = new Thread(new MajorCompaction());
217      majorcompact.start();
218
219      Thread cleaner = new Thread(new CleanMobAndArchive());
220      cleaner.start();
221
222      while (run) {
223        Thread.sleep(1000);
224      }
225
226      getNumberOfMobFiles(conf, table.getName(), new String(fam));
227      LOG.info("Waiting for write thread to finish ...");
228      writeData.join();
229      // Cleanup again
230      MobFileCleanupUtil.cleanupObsoleteMobFiles(conf, table.getName(), admin);
231      getNumberOfMobFiles(conf, table.getName(), new String(fam));
232
233      if (HTU != null) {
234        LOG.info("Archive cleaner started ...");
235        // Call archive cleaner again
236        HTU.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
237        LOG.info("Archive cleaner finished");
238      }
239
240      scanTable();
241
242    } finally {
243
244      admin.disableTable(hdt.getTableName());
245      admin.deleteTable(hdt.getTableName());
246    }
247    LOG.info("MOB Stress Test finished OK");
248    printStats(count);
249
250  }
251
252  private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
253    throws IOException {
254    FileSystem fs = FileSystem.get(conf);
255    Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
256    FileStatus[] stat = fs.listStatus(dir);
257    long size = 0;
258    for (FileStatus st : stat) {
259      LOG.debug("MOB Directory content: {} len={}", st.getPath(), st.getLen());
260      size += st.getLen();
261    }
262    LOG.debug("MOB Directory content total files: {}, total size={}", stat.length, size);
263
264    return stat.length;
265  }
266
267  public void printStats(long loaded) {
268    LOG.info("MOB Stress Test: loaded=" + loaded + " compactions="
269      + FaultyMobStoreCompactor.totalCompactions.get() + " major="
270      + FaultyMobStoreCompactor.totalMajorCompactions.get() + " mob="
271      + FaultyMobStoreCompactor.mobCounter.get() + " injected failures="
272      + FaultyMobStoreCompactor.totalFailures.get());
273  }
274
275  private void scanTable() {
276    try {
277
278      Result result;
279      ResultScanner scanner = table.getScanner(fam);
280      int counter = 0;
281      while ((result = scanner.next()) != null) {
282        byte[] key = result.getRow();
283        assertTrue(Arrays.equals(result.getValue(fam, qualifier), Bytes.add(key, mobVal)));
284        if (counter % 10000 == 0) {
285          LOG.info("GET=" + counter + " key=" + Bytes.toInt(key));
286        }
287        counter++;
288      }
289
290      assertEquals(count, counter);
291    } catch (Exception e) {
292      e.printStackTrace();
293      LOG.error("MOB Stress Test FAILED");
294      if (HTU != null) {
295        assertTrue(false);
296      } else {
297        System.exit(-1);
298      }
299    }
300  }
301}