001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Locale;
023import java.util.Set;
024import org.apache.commons.lang3.StringUtils;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
027import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
028import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
029import org.apache.hadoop.hbase.regionserver.HStore;
030import org.apache.hadoop.hbase.regionserver.StoreEngine;
031import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
032import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
033import org.apache.hadoop.hbase.util.AbstractHBaseTool;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
037import org.apache.hadoop.hbase.util.MultiThreadedAction;
038import org.apache.hadoop.hbase.util.MultiThreadedReader;
039import org.apache.hadoop.hbase.util.MultiThreadedWriter;
040import org.apache.hadoop.hbase.util.RegionSplitter;
041import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.junit.Assert;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
050
051/**
052 * A perf test which does large data ingestion using stripe compactions and regular compactions.
053 */
054@InterfaceAudience.Private
055public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
056  private static final Logger LOG =
057    LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class);
058
059  private static final TableName TABLE_NAME =
060    TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName());
061  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
062  private static final int MIN_NUM_SERVERS = 1;
063
064  // Option names.
065  private static final String DATAGEN_KEY = "datagen";
066  private static final String ITERATIONS_KEY = "iters";
067  private static final String PRELOAD_COUNT_KEY = "pwk";
068  private static final String WRITE_COUNT_KEY = "wk";
069  private static final String WRITE_THREADS_KEY = "wt";
070  private static final String READ_THREADS_KEY = "rt";
071  private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
072  private static final String SPLIT_SIZE_KEY = "splitsize";
073  private static final String SPLIT_PARTS_KEY = "splitparts";
074  private static final String VALUE_SIZE_KEY = "valsize";
075  private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
076
077  // Option values.
078  private LoadTestDataGenerator dataGen;
079  private int iterationCount;
080  private long preloadKeys;
081  private long writeKeys;
082  private int writeThreads;
083  private int readThreads;
084  private Long initialStripeCount;
085  private Long splitSize;
086  private Long splitParts;
087
088  private static final String VALUE_SIZE_DEFAULT = "512:4096";
089
090  protected IntegrationTestingUtility util = new IntegrationTestingUtility();
091
092  @Override
093  protected void addOptions() {
094    addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
095    addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
096      + " sequences. The number of such shards per server is specified (default is 1).");
097    addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
098    addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
099    addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
100    addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
101    addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
102    addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
103    addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
104    addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
105    addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
106      + " default " + VALUE_SIZE_DEFAULT);
107  }
108
109  @Override
110  protected void processOptions(CommandLine cmd) {
111    int minValueSize = 0, maxValueSize = 0;
112    String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
113    if (valueSize.contains(":")) {
114      List<String> valueSizes = Splitter.on(':').splitToList(valueSize);
115      if (valueSizes.size() != 2) {
116        throw new RuntimeException("Invalid value size: " + valueSize);
117      }
118      minValueSize = Integer.parseInt(Iterables.get(valueSizes, 0));
119      maxValueSize = Integer.parseInt(Iterables.get(valueSizes, 1));
120    } else {
121      minValueSize = maxValueSize = Integer.parseInt(valueSize);
122    }
123    String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT);
124    if ("default".equals(datagen)) {
125      dataGen = new MultiThreadedAction.DefaultDataGenerator(minValueSize, maxValueSize, 1, 1,
126        new byte[][] { COLUMN_FAMILY });
127    } else if ("sequential".equals(datagen)) {
128      int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
129      dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
130    } else {
131      throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
132    }
133    iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
134    preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
135    writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
136    writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
137    readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
138    initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
139    splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
140    splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
141  }
142
143  private Long getLongOrNull(CommandLine cmd, String option) {
144    if (!cmd.hasOption(option)) return null;
145    return Long.parseLong(cmd.getOptionValue(option));
146  }
147
148  @Override
149  public Configuration getConf() {
150    Configuration c = super.getConf();
151    if (c == null && util != null) {
152      conf = util.getConfiguration();
153      c = conf;
154    }
155    return c;
156  }
157
158  @Override
159  protected int doWork() throws Exception {
160    setUp();
161    try {
162      boolean isStripe = true;
163      for (int i = 0; i < iterationCount * 2; ++i) {
164        createTable(isStripe);
165        runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
166        isStripe = !isStripe;
167      }
168      return 0;
169    } finally {
170      tearDown();
171    }
172  }
173
174  private void setUp() throws Exception {
175    this.util = new IntegrationTestingUtility();
176    LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
177    util.initializeCluster(MIN_NUM_SERVERS);
178    LOG.debug("Done initializing/checking cluster");
179  }
180
181  protected void deleteTable() throws Exception {
182    if (util.getAdmin().tableExists(TABLE_NAME)) {
183      LOG.info("Deleting table");
184      if (!util.getAdmin().isTableDisabled(TABLE_NAME)) {
185        util.getAdmin().disableTable(TABLE_NAME);
186      }
187      util.getAdmin().deleteTable(TABLE_NAME);
188      LOG.info("Deleted table");
189    }
190  }
191
192  private void createTable(boolean isStripe) throws Exception {
193    createTable(createHtd(isStripe));
194  }
195
196  private void tearDown() throws Exception {
197    deleteTable();
198    LOG.info("Restoring the cluster");
199    util.restoreCluster();
200    LOG.info("Done restoring the cluster");
201  }
202
203  private void runOneTest(String description, Configuration conf) throws Exception {
204    int numServers =
205      util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size();
206    long startKey = preloadKeys * numServers;
207    long endKey = startKey + writeKeys * numServers;
208    status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
209      description, numServers, startKey, endKey));
210
211    if (preloadKeys > 0) {
212      MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
213      long time = EnvironmentEdgeManager.currentTime();
214      preloader.start(0, startKey, writeThreads);
215      preloader.waitForFinish();
216      if (preloader.getNumWriteFailures() > 0) {
217        throw new IOException("Preload failed");
218      }
219      int waitTime = (int) Math.min(preloadKeys / 100, 30000); // arbitrary
220      status(description + " preload took " + (EnvironmentEdgeManager.currentTime() - time) / 1000
221        + "sec; sleeping for " + waitTime / 1000 + "sec for store to stabilize");
222      Thread.sleep(waitTime);
223    }
224
225    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
226    MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
227    // reader.getMetrics().enable();
228    reader.linkToWriter(writer);
229
230    long testStartTime = EnvironmentEdgeManager.currentTime();
231    writer.start(startKey, endKey, writeThreads);
232    reader.start(startKey, endKey, readThreads);
233    writer.waitForFinish();
234    reader.waitForFinish();
235    // reader.waitForVerification(300000);
236    // reader.abortAndWaitForFinish();
237    status("Readers and writers stopped for test " + description);
238
239    boolean success = writer.getNumWriteFailures() == 0;
240    if (!success) {
241      LOG.error("Write failed");
242    } else {
243      success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
244      if (!success) {
245        LOG.error("Read failed");
246      }
247    }
248
249    // Dump perf regardless of the result.
250    /*
251     * StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt :
252     * reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n",
253     * description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long,
254     * Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while
255     * (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next();
256     * perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(),
257     * pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description +
258     * " test: \n" + perfDump.toString());
259     */
260    status(description + " test took "
261      + (EnvironmentEdgeManager.currentTime() - testStartTime) / 1000 + "sec");
262    Assert.assertTrue(success);
263  }
264
265  private static void status(String s) {
266    LOG.info("STATUS " + s);
267    System.out.println(s);
268  }
269
270  private TableDescriptorBuilder createHtd(boolean isStripe) throws Exception {
271    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME)
272      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
273    String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
274    builder.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
275    if (isStripe) {
276      builder.setValue(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
277      if (initialStripeCount != null) {
278        builder.setValue(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString());
279        builder.setValue(HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount));
280      } else {
281        builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "500");
282      }
283      if (splitSize != null) {
284        builder.setValue(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
285      }
286      if (splitParts != null) {
287        builder.setValue(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
288      }
289    } else {
290      builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "10"); // default
291    }
292    return builder;
293  }
294
295  private void createTable(TableDescriptorBuilder builder) throws Exception {
296    deleteTable();
297    if (util.getHBaseClusterInterface() instanceof SingleProcessHBaseCluster) {
298      LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
299      builder.setValue(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
300    }
301    byte[][] splits = new RegionSplitter.HexStringSplit()
302      .split(util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
303    util.getAdmin().createTable(builder.build(), splits);
304  }
305
306  public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
307    private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
308    private static final int PAD_TO = 10;
309    private static final int PREFIX_PAD_TO = 7;
310
311    private final int numPartitions;
312
313    public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
314      super(minValueSize, maxValueSize);
315      this.numPartitions = numPartitions;
316    }
317
318    @Override
319    public byte[] getDeterministicUniqueKey(long keyBase) {
320      String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
321      return Bytes.toBytes(getPrefix(keyBase) + num);
322    }
323
324    private String getPrefix(long i) {
325      return StringUtils.leftPad(String.valueOf((int) (i % numPartitions)), PREFIX_PAD_TO, "0");
326    }
327
328    @Override
329    public byte[][] getColumnFamilies() {
330      return new byte[][] { COLUMN_FAMILY };
331    }
332
333    @Override
334    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
335      return COLUMN_NAMES;
336    }
337
338    @Override
339    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
340      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
341    }
342
343    @Override
344    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
345      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
346    }
347
348    @Override
349    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
350      return true;
351    }
352  }
353}