001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Locale; 023import java.util.Set; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 027import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 028import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 029import org.apache.hadoop.hbase.regionserver.HStore; 030import org.apache.hadoop.hbase.regionserver.StoreEngine; 031import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 032import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 033import org.apache.hadoop.hbase.util.AbstractHBaseTool; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 037import org.apache.hadoop.hbase.util.MultiThreadedAction; 038import org.apache.hadoop.hbase.util.MultiThreadedReader; 039import org.apache.hadoop.hbase.util.MultiThreadedWriter; 040import org.apache.hadoop.hbase.util.RegionSplitter; 041import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.junit.Assert; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 049import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 050 051/** 052 * A perf test which does large data ingestion using stripe compactions and regular compactions. 053 */ 054@InterfaceAudience.Private 055public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool { 056 private static final Logger LOG = 057 LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class); 058 059 private static final TableName TABLE_NAME = 060 TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName()); 061 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF"); 062 private static final int MIN_NUM_SERVERS = 1; 063 064 // Option names. 065 private static final String DATAGEN_KEY = "datagen"; 066 private static final String ITERATIONS_KEY = "iters"; 067 private static final String PRELOAD_COUNT_KEY = "pwk"; 068 private static final String WRITE_COUNT_KEY = "wk"; 069 private static final String WRITE_THREADS_KEY = "wt"; 070 private static final String READ_THREADS_KEY = "rt"; 071 private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes"; 072 private static final String SPLIT_SIZE_KEY = "splitsize"; 073 private static final String SPLIT_PARTS_KEY = "splitparts"; 074 private static final String VALUE_SIZE_KEY = "valsize"; 075 private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards"; 076 077 // Option values. 078 private LoadTestDataGenerator dataGen; 079 private int iterationCount; 080 private long preloadKeys; 081 private long writeKeys; 082 private int writeThreads; 083 private int readThreads; 084 private Long initialStripeCount; 085 private Long splitSize; 086 private Long splitParts; 087 088 private static final String VALUE_SIZE_DEFAULT = "512:4096"; 089 090 protected IntegrationTestingUtility util = new IntegrationTestingUtility(); 091 092 @Override 093 protected void addOptions() { 094 addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)"); 095 addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many" 096 + " sequences. The number of such shards per server is specified (default is 1)."); 097 addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare"); 098 addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server"); 099 addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server"); 100 addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing"); 101 addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading"); 102 addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially"); 103 addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes"); 104 addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits"); 105 addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;" 106 + " default " + VALUE_SIZE_DEFAULT); 107 } 108 109 @Override 110 protected void processOptions(CommandLine cmd) { 111 int minValueSize = 0, maxValueSize = 0; 112 String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT); 113 if (valueSize.contains(":")) { 114 List<String> valueSizes = Splitter.on(':').splitToList(valueSize); 115 if (valueSizes.size() != 2) { 116 throw new RuntimeException("Invalid value size: " + valueSize); 117 } 118 minValueSize = Integer.parseInt(Iterables.get(valueSizes, 0)); 119 maxValueSize = Integer.parseInt(Iterables.get(valueSizes, 1)); 120 } else { 121 minValueSize = maxValueSize = Integer.parseInt(valueSize); 122 } 123 String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT); 124 if ("default".equals(datagen)) { 125 dataGen = new MultiThreadedAction.DefaultDataGenerator(minValueSize, maxValueSize, 1, 1, 126 new byte[][] { COLUMN_FAMILY }); 127 } else if ("sequential".equals(datagen)) { 128 int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1")); 129 dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards); 130 } else { 131 throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen); 132 } 133 iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1")); 134 preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000")); 135 writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000")); 136 writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10")); 137 readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20")); 138 initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY); 139 splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY); 140 splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY); 141 } 142 143 private Long getLongOrNull(CommandLine cmd, String option) { 144 if (!cmd.hasOption(option)) return null; 145 return Long.parseLong(cmd.getOptionValue(option)); 146 } 147 148 @Override 149 public Configuration getConf() { 150 Configuration c = super.getConf(); 151 if (c == null && util != null) { 152 conf = util.getConfiguration(); 153 c = conf; 154 } 155 return c; 156 } 157 158 @Override 159 protected int doWork() throws Exception { 160 setUp(); 161 try { 162 boolean isStripe = true; 163 for (int i = 0; i < iterationCount * 2; ++i) { 164 createTable(isStripe); 165 runOneTest((isStripe ? "Stripe" : "Default") + i, conf); 166 isStripe = !isStripe; 167 } 168 return 0; 169 } finally { 170 tearDown(); 171 } 172 } 173 174 private void setUp() throws Exception { 175 this.util = new IntegrationTestingUtility(); 176 LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers"); 177 util.initializeCluster(MIN_NUM_SERVERS); 178 LOG.debug("Done initializing/checking cluster"); 179 } 180 181 protected void deleteTable() throws Exception { 182 if (util.getAdmin().tableExists(TABLE_NAME)) { 183 LOG.info("Deleting table"); 184 if (!util.getAdmin().isTableDisabled(TABLE_NAME)) { 185 util.getAdmin().disableTable(TABLE_NAME); 186 } 187 util.getAdmin().deleteTable(TABLE_NAME); 188 LOG.info("Deleted table"); 189 } 190 } 191 192 private void createTable(boolean isStripe) throws Exception { 193 createTable(createHtd(isStripe)); 194 } 195 196 private void tearDown() throws Exception { 197 deleteTable(); 198 LOG.info("Restoring the cluster"); 199 util.restoreCluster(); 200 LOG.info("Done restoring the cluster"); 201 } 202 203 private void runOneTest(String description, Configuration conf) throws Exception { 204 int numServers = 205 util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size(); 206 long startKey = preloadKeys * numServers; 207 long endKey = startKey + writeKeys * numServers; 208 status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", 209 description, numServers, startKey, endKey)); 210 211 if (preloadKeys > 0) { 212 MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 213 long time = EnvironmentEdgeManager.currentTime(); 214 preloader.start(0, startKey, writeThreads); 215 preloader.waitForFinish(); 216 if (preloader.getNumWriteFailures() > 0) { 217 throw new IOException("Preload failed"); 218 } 219 int waitTime = (int) Math.min(preloadKeys / 100, 30000); // arbitrary 220 status(description + " preload took " + (EnvironmentEdgeManager.currentTime() - time) / 1000 221 + "sec; sleeping for " + waitTime / 1000 + "sec for store to stabilize"); 222 Thread.sleep(waitTime); 223 } 224 225 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 226 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); 227 // reader.getMetrics().enable(); 228 reader.linkToWriter(writer); 229 230 long testStartTime = EnvironmentEdgeManager.currentTime(); 231 writer.start(startKey, endKey, writeThreads); 232 reader.start(startKey, endKey, readThreads); 233 writer.waitForFinish(); 234 reader.waitForFinish(); 235 // reader.waitForVerification(300000); 236 // reader.abortAndWaitForFinish(); 237 status("Readers and writers stopped for test " + description); 238 239 boolean success = writer.getNumWriteFailures() == 0; 240 if (!success) { 241 LOG.error("Write failed"); 242 } else { 243 success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; 244 if (!success) { 245 LOG.error("Read failed"); 246 } 247 } 248 249 // Dump perf regardless of the result. 250 /* 251 * StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : 252 * reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", 253 * description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, 254 * Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while 255 * (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); 256 * perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), 257 * pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + 258 * " test: \n" + perfDump.toString()); 259 */ 260 status(description + " test took " 261 + (EnvironmentEdgeManager.currentTime() - testStartTime) / 1000 + "sec"); 262 Assert.assertTrue(success); 263 } 264 265 private static void status(String s) { 266 LOG.info("STATUS " + s); 267 System.out.println(s); 268 } 269 270 private TableDescriptorBuilder createHtd(boolean isStripe) throws Exception { 271 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME) 272 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 273 String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName(); 274 builder.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy); 275 if (isStripe) { 276 builder.setValue(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); 277 if (initialStripeCount != null) { 278 builder.setValue(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString()); 279 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount)); 280 } else { 281 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "500"); 282 } 283 if (splitSize != null) { 284 builder.setValue(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString()); 285 } 286 if (splitParts != null) { 287 builder.setValue(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString()); 288 } 289 } else { 290 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "10"); // default 291 } 292 return builder; 293 } 294 295 private void createTable(TableDescriptorBuilder builder) throws Exception { 296 deleteTable(); 297 if (util.getHBaseClusterInterface() instanceof SingleProcessHBaseCluster) { 298 LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low."); 299 builder.setValue(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576"); 300 } 301 byte[][] splits = new RegionSplitter.HexStringSplit() 302 .split(util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 303 util.getAdmin().createTable(builder.build(), splits); 304 } 305 306 public static class SeqShardedDataGenerator extends LoadTestDataGenerator { 307 private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") }; 308 private static final int PAD_TO = 10; 309 private static final int PREFIX_PAD_TO = 7; 310 311 private final int numPartitions; 312 313 public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) { 314 super(minValueSize, maxValueSize); 315 this.numPartitions = numPartitions; 316 } 317 318 @Override 319 public byte[] getDeterministicUniqueKey(long keyBase) { 320 String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0"); 321 return Bytes.toBytes(getPrefix(keyBase) + num); 322 } 323 324 private String getPrefix(long i) { 325 return StringUtils.leftPad(String.valueOf((int) (i % numPartitions)), PREFIX_PAD_TO, "0"); 326 } 327 328 @Override 329 public byte[][] getColumnFamilies() { 330 return new byte[][] { COLUMN_FAMILY }; 331 } 332 333 @Override 334 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 335 return COLUMN_NAMES; 336 } 337 338 @Override 339 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 340 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 341 } 342 343 @Override 344 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 345 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 346 } 347 348 @Override 349 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 350 return true; 351 } 352 } 353}