001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Properties; 040import java.util.Queue; 041import java.util.Random; 042import java.util.TreeMap; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.Future; 048import java.util.concurrent.ThreadLocalRandom; 049import org.apache.commons.lang3.StringUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.conf.Configured; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.Append; 056import org.apache.hadoop.hbase.client.AsyncConnection; 057import org.apache.hadoop.hbase.client.AsyncTable; 058import org.apache.hadoop.hbase.client.BufferedMutator; 059import org.apache.hadoop.hbase.client.BufferedMutatorParams; 060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 061import org.apache.hadoop.hbase.client.Connection; 062import org.apache.hadoop.hbase.client.ConnectionFactory; 063import org.apache.hadoop.hbase.client.Consistency; 064import org.apache.hadoop.hbase.client.Delete; 065import org.apache.hadoop.hbase.client.Durability; 066import org.apache.hadoop.hbase.client.Get; 067import org.apache.hadoop.hbase.client.Increment; 068import org.apache.hadoop.hbase.client.Put; 069import org.apache.hadoop.hbase.client.RegionInfo; 070import org.apache.hadoop.hbase.client.RegionInfoBuilder; 071import org.apache.hadoop.hbase.client.RegionLocator; 072import org.apache.hadoop.hbase.client.Result; 073import org.apache.hadoop.hbase.client.ResultScanner; 074import org.apache.hadoop.hbase.client.RowMutations; 075import org.apache.hadoop.hbase.client.Scan; 076import org.apache.hadoop.hbase.client.Table; 077import org.apache.hadoop.hbase.client.TableDescriptor; 078import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 079import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 080import org.apache.hadoop.hbase.filter.BinaryComparator; 081import org.apache.hadoop.hbase.filter.Filter; 082import org.apache.hadoop.hbase.filter.FilterAllFilter; 083import org.apache.hadoop.hbase.filter.FilterList; 084import org.apache.hadoop.hbase.filter.PageFilter; 085import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 086import org.apache.hadoop.hbase.filter.WhileMatchFilter; 087import org.apache.hadoop.hbase.io.compress.Compression; 088import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 089import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 090import org.apache.hadoop.hbase.regionserver.BloomType; 091import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 092import org.apache.hadoop.hbase.trace.TraceUtil; 093import org.apache.hadoop.hbase.util.ByteArrayHashKey; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 096import org.apache.hadoop.hbase.util.GsonUtil; 097import org.apache.hadoop.hbase.util.Hash; 098import org.apache.hadoop.hbase.util.MurmurHash; 099import org.apache.hadoop.hbase.util.Pair; 100import org.apache.hadoop.hbase.util.RandomDistribution; 101import org.apache.hadoop.hbase.util.YammerHistogramUtils; 102import org.apache.hadoop.io.LongWritable; 103import org.apache.hadoop.io.Text; 104import org.apache.hadoop.mapreduce.Job; 105import org.apache.hadoop.mapreduce.Mapper; 106import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 107import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 108import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 109import org.apache.hadoop.util.Tool; 110import org.apache.hadoop.util.ToolRunner; 111import org.apache.yetus.audience.InterfaceAudience; 112import org.slf4j.Logger; 113import org.slf4j.LoggerFactory; 114 115import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 116import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 117import org.apache.hbase.thirdparty.com.google.gson.Gson; 118 119/** 120 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 121 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 122 * etc.). Pass on the command-line which test to run and how many clients are participating in this 123 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 124 * <p> 125 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 126 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 127 * pages 8-10. 128 * <p> 129 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 130 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 131 * about 1GB of data, unless specified otherwise. 132 */ 133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 134public class PerformanceEvaluation extends Configured implements Tool { 135 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 136 static final String RANDOM_READ = "randomRead"; 137 static final String PE_COMMAND_SHORTNAME = "pe"; 138 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 139 private static final Gson GSON = GsonUtil.createGson().create(); 140 141 public static final String TABLE_NAME = "TestTable"; 142 public static final String FAMILY_NAME_BASE = "info"; 143 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 144 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 145 public static final int DEFAULT_VALUE_LENGTH = 1000; 146 public static final int ROW_LENGTH = 26; 147 148 private static final int ONE_GB = 1024 * 1024 * 1000; 149 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 150 // TODO : should we make this configurable 151 private static final int TAG_LENGTH = 256; 152 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 153 private static final MathContext CXT = MathContext.DECIMAL64; 154 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 155 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 156 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 157 158 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 159 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 160 161 static { 162 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 163 "Run async random read test"); 164 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 165 "Run async random write test"); 166 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 167 "Run async sequential read test"); 168 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 169 "Run async sequential write test"); 170 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 171 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 172 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 173 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 174 "Run random seek and scan 100 test"); 175 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 176 "Run random seek scan with both start and stop row (max 10 rows)"); 177 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 178 "Run random seek scan with both start and stop row (max 100 rows)"); 179 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 180 "Run random seek scan with both start and stop row (max 1000 rows)"); 181 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 182 "Run random seek scan with both start and stop row (max 10000 rows)"); 183 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 184 addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test"); 185 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 186 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 187 addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete", 188 "Run sequential delete test"); 189 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 190 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 191 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 192 addCommandDescriptor(ReverseScanTest.class, "reverseScan", 193 "Run reverse scan test (read every row)"); 194 addCommandDescriptor(FilteredScanTest.class, "filterScan", 195 "Run scan test using a filter to find a specific row based on it's value " 196 + "(make sure to use --rows=20)"); 197 addCommandDescriptor(IncrementTest.class, "increment", 198 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 199 addCommandDescriptor(AppendTest.class, "append", 200 "Append on each row; clients overlap on keyspace so some concurrent operations"); 201 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 202 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 203 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 204 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 205 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 206 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 207 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 208 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 209 } 210 211 /** 212 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 213 * associated properties. 214 */ 215 protected static enum Counter { 216 /** elapsed time */ 217 ELAPSED_TIME, 218 /** number of rows */ 219 ROWS 220 } 221 222 protected static class RunResult implements Comparable<RunResult> { 223 public RunResult(long duration, Histogram hist) { 224 this.duration = duration; 225 this.hist = hist; 226 numbOfReplyOverThreshold = 0; 227 numOfReplyFromReplica = 0; 228 } 229 230 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 231 Histogram hist) { 232 this.duration = duration; 233 this.hist = hist; 234 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 235 this.numOfReplyFromReplica = numOfReplyFromReplica; 236 } 237 238 public final long duration; 239 public final Histogram hist; 240 public final long numbOfReplyOverThreshold; 241 public final long numOfReplyFromReplica; 242 243 @Override 244 public String toString() { 245 return Long.toString(duration); 246 } 247 248 @Override 249 public int compareTo(RunResult o) { 250 return Long.compare(this.duration, o.duration); 251 } 252 253 @Override 254 public boolean equals(Object obj) { 255 if (this == obj) { 256 return true; 257 } 258 if (obj == null || getClass() != obj.getClass()) { 259 return false; 260 } 261 return this.compareTo((RunResult) obj) == 0; 262 } 263 264 @Override 265 public int hashCode() { 266 return Long.hashCode(duration); 267 } 268 } 269 270 /** 271 * Constructor 272 * @param conf Configuration object 273 */ 274 public PerformanceEvaluation(final Configuration conf) { 275 super(conf); 276 } 277 278 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 279 String description) { 280 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 281 COMMANDS.put(name, cmdDescriptor); 282 } 283 284 /** 285 * Implementations can have their status set. 286 */ 287 interface Status { 288 /** 289 * Sets status 290 * @param msg status message 291 */ 292 void setStatus(final String msg) throws IOException; 293 } 294 295 /** 296 * MapReduce job that runs a performance evaluation client in each map task. 297 */ 298 public static class EvaluationMapTask 299 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 300 301 /** configuration parameter name that contains the command */ 302 public final static String CMD_KEY = "EvaluationMapTask.command"; 303 /** configuration parameter name that contains the PE impl */ 304 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 305 306 private Class<? extends Test> cmd; 307 308 @Override 309 protected void setup(Context context) throws IOException, InterruptedException { 310 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 311 312 // this is required so that extensions of PE are instantiated within the 313 // map reduce task... 314 Class<? extends PerformanceEvaluation> peClass = 315 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 316 try { 317 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 318 } catch (Exception e) { 319 throw new IllegalStateException("Could not instantiate PE instance", e); 320 } 321 } 322 323 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 324 try { 325 return Class.forName(className).asSubclass(type); 326 } catch (ClassNotFoundException e) { 327 throw new IllegalStateException("Could not find class for name: " + className, e); 328 } 329 } 330 331 @Override 332 protected void map(LongWritable key, Text value, final Context context) 333 throws IOException, InterruptedException { 334 335 Status status = new Status() { 336 @Override 337 public void setStatus(String msg) { 338 context.setStatus(msg); 339 } 340 }; 341 342 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 343 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 344 final Connection con = ConnectionFactory.createConnection(conf); 345 AsyncConnection asyncCon = null; 346 try { 347 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 348 } catch (ExecutionException e) { 349 throw new IOException(e); 350 } 351 352 // Evaluation task 353 RunResult result = 354 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 355 // Collect how much time the thing took. Report as map output and 356 // to the ELAPSED_TIME counter. 357 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 358 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 359 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 360 context.progress(); 361 } 362 } 363 364 /* 365 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 366 * is specified or when the existing table's region replica count doesn't match {@code 367 * opts.replicas}. 368 */ 369 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 370 TableName tableName = TableName.valueOf(opts.tableName); 371 boolean needsDelete = false, exists = admin.tableExists(tableName); 372 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 373 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 374 boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete"); 375 if (!exists && (isReadCmd || isDeleteCmd)) { 376 throw new IllegalStateException( 377 "Must specify an existing table for read commands. Run a write command first."); 378 } 379 TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null; 380 byte[][] splits = getSplits(opts); 381 382 // recreate the table when user has requested presplit or when existing 383 // {RegionSplitPolicy,replica count} does not match requested, or when the 384 // number of column families does not match requested. 385 if ( 386 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions 387 && opts.presplitRegions != admin.getRegions(tableName).size()) 388 || (!isReadCmd && desc != null 389 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 390 || (!(isReadCmd || isDeleteCmd) && desc != null 391 && desc.getRegionReplication() != opts.replicas) 392 || (desc != null && desc.getColumnFamilyCount() != opts.families) 393 ) { 394 needsDelete = true; 395 // wait, why did it delete my table?!? 396 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 397 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 398 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 399 .add("replicas", opts.replicas).add("families", opts.families).toString()); 400 } 401 402 // remove an existing table 403 if (needsDelete) { 404 if (admin.isTableEnabled(tableName)) { 405 admin.disableTable(tableName); 406 } 407 admin.deleteTable(tableName); 408 } 409 410 // table creation is necessary 411 if (!exists || needsDelete) { 412 desc = getTableDescriptor(opts); 413 if (splits != null) { 414 if (LOG.isDebugEnabled()) { 415 for (int i = 0; i < splits.length; i++) { 416 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 417 } 418 } 419 } 420 if (splits != null) { 421 admin.createTable(desc, splits); 422 } else { 423 admin.createTable(desc); 424 } 425 LOG.info("Table " + desc + " created"); 426 } 427 return admin.tableExists(tableName); 428 } 429 430 /** 431 * Create an HTableDescriptor from provided TestOptions. 432 */ 433 protected static TableDescriptor getTableDescriptor(TestOptions opts) { 434 TableDescriptorBuilder builder = 435 TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName)); 436 437 for (int family = 0; family < opts.families; family++) { 438 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 439 ColumnFamilyDescriptorBuilder cfBuilder = 440 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 441 cfBuilder.setDataBlockEncoding(opts.blockEncoding); 442 cfBuilder.setCompressionType(opts.compression); 443 cfBuilder.setEncryptionType(opts.encryption); 444 cfBuilder.setBloomFilterType(opts.bloomType); 445 cfBuilder.setBlocksize(opts.blockSize); 446 if (opts.inMemoryCF) { 447 cfBuilder.setInMemory(true); 448 } 449 cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction); 450 builder.setColumnFamily(cfBuilder.build()); 451 } 452 if (opts.replicas != DEFAULT_OPTS.replicas) { 453 builder.setRegionReplication(opts.replicas); 454 } 455 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 456 builder.setRegionSplitPolicyClassName(opts.splitPolicy); 457 } 458 return builder.build(); 459 } 460 461 /** 462 * generates splits based on total number of rows and specified split regions 463 */ 464 protected static byte[][] getSplits(TestOptions opts) { 465 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 466 467 int numSplitPoints = opts.presplitRegions - 1; 468 byte[][] splits = new byte[numSplitPoints][]; 469 long jump = opts.totalRows / opts.presplitRegions; 470 for (int i = 0; i < numSplitPoints; i++) { 471 long rowkey = jump * (1 + i); 472 splits[i] = format(rowkey); 473 } 474 return splits; 475 } 476 477 static void setupConnectionCount(final TestOptions opts) { 478 if (opts.oneCon) { 479 opts.connCount = 1; 480 } else { 481 if (opts.connCount == -1) { 482 // set to thread number if connCount is not set 483 opts.connCount = opts.numClientThreads; 484 } 485 } 486 } 487 488 /* 489 * Run all clients in this vm each to its own thread. 490 */ 491 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 492 throws IOException, InterruptedException, ExecutionException { 493 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 494 assert cmd != null; 495 @SuppressWarnings("unchecked") 496 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 497 RunResult[] results = new RunResult[opts.numClientThreads]; 498 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 499 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 500 setupConnectionCount(opts); 501 final Connection[] cons = new Connection[opts.connCount]; 502 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 503 for (int i = 0; i < opts.connCount; i++) { 504 cons[i] = ConnectionFactory.createConnection(conf); 505 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 506 } 507 LOG 508 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 509 for (int i = 0; i < threads.length; i++) { 510 final int index = i; 511 threads[i] = pool.submit(new Callable<RunResult>() { 512 @Override 513 public RunResult call() throws Exception { 514 TestOptions threadOpts = new TestOptions(opts); 515 final Connection con = cons[index % cons.length]; 516 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 517 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 518 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 519 @Override 520 public void setStatus(final String msg) throws IOException { 521 LOG.info(msg); 522 } 523 }); 524 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 525 + "ms over " + threadOpts.perClientRunRows + " rows"); 526 if (opts.latencyThreshold > 0) { 527 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 528 + "(ms) is " + run.numbOfReplyOverThreshold); 529 } 530 return run; 531 } 532 }); 533 } 534 pool.shutdown(); 535 536 for (int i = 0; i < threads.length; i++) { 537 try { 538 results[i] = threads[i].get(); 539 } catch (ExecutionException e) { 540 throw new IOException(e.getCause()); 541 } 542 } 543 final String test = cmd.getSimpleName(); 544 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 545 Arrays.sort(results); 546 long total = 0; 547 float avgLatency = 0; 548 float avgTPS = 0; 549 long replicaWins = 0; 550 for (RunResult result : results) { 551 total += result.duration; 552 avgLatency += result.hist.getSnapshot().getMean(); 553 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 554 replicaWins += result.numOfReplyFromReplica; 555 } 556 avgTPS *= 1000; // ms to second 557 avgLatency = avgLatency / results.length; 558 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 559 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 560 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 561 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 562 if (opts.replicas > 1) { 563 LOG.info("[results from replica regions] " + replicaWins); 564 } 565 566 for (int i = 0; i < opts.connCount; i++) { 567 cons[i].close(); 568 asyncCons[i].close(); 569 } 570 571 return results; 572 } 573 574 /* 575 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 576 * out an input file with instruction per client regards which row they are to start on. 577 * @param cmd Command to run. 578 */ 579 static Job doMapReduce(TestOptions opts, final Configuration conf) 580 throws IOException, InterruptedException, ClassNotFoundException { 581 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 582 assert cmd != null; 583 Path inputDir = writeInputFile(conf, opts); 584 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 585 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 586 Job job = Job.getInstance(conf); 587 job.setJarByClass(PerformanceEvaluation.class); 588 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 589 590 job.setInputFormatClass(NLineInputFormat.class); 591 NLineInputFormat.setInputPaths(job, inputDir); 592 // this is default, but be explicit about it just in case. 593 NLineInputFormat.setNumLinesPerSplit(job, 1); 594 595 job.setOutputKeyClass(LongWritable.class); 596 job.setOutputValueClass(LongWritable.class); 597 598 job.setMapperClass(EvaluationMapTask.class); 599 job.setReducerClass(LongSumReducer.class); 600 601 job.setNumReduceTasks(1); 602 603 job.setOutputFormatClass(TextOutputFormat.class); 604 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 605 606 TableMapReduceUtil.addDependencyJars(job); 607 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 608 // metrics 609 Gson.class, // gson 610 FilterAllFilter.class // hbase-server tests jar 611 ); 612 613 TableMapReduceUtil.initCredentials(job); 614 615 job.waitForCompletion(true); 616 return job; 617 } 618 619 /** 620 * Each client has one mapper to do the work, and client do the resulting count in a map task. 621 */ 622 623 static String JOB_INPUT_FILENAME = "input.txt"; 624 625 /* 626 * Write input file of offsets-per-client for the mapreduce job. 627 * @param c Configuration 628 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 629 */ 630 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 631 return writeInputFile(c, opts, new Path(".")); 632 } 633 634 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 635 throws IOException { 636 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 637 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 638 Path inputDir = new Path(jobdir, "inputs"); 639 640 FileSystem fs = FileSystem.get(c); 641 fs.mkdirs(inputDir); 642 643 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 644 PrintStream out = new PrintStream(fs.create(inputFile)); 645 // Make input random. 646 Map<Integer, String> m = new TreeMap<>(); 647 Hash h = MurmurHash.getInstance(); 648 long perClientRows = (opts.totalRows / opts.numClientThreads); 649 try { 650 for (int j = 0; j < opts.numClientThreads; j++) { 651 TestOptions next = new TestOptions(opts); 652 next.startRow = j * perClientRows; 653 next.perClientRunRows = perClientRows; 654 String s = GSON.toJson(next); 655 LOG.info("Client=" + j + ", input=" + s); 656 byte[] b = Bytes.toBytes(s); 657 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 658 m.put(hash, s); 659 } 660 for (Map.Entry<Integer, String> e : m.entrySet()) { 661 out.println(e.getValue()); 662 } 663 } finally { 664 out.close(); 665 } 666 return inputDir; 667 } 668 669 /** 670 * Describes a command. 671 */ 672 static class CmdDescriptor { 673 private Class<? extends TestBase> cmdClass; 674 private String name; 675 private String description; 676 677 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 678 this.cmdClass = cmdClass; 679 this.name = name; 680 this.description = description; 681 } 682 683 public Class<? extends TestBase> getCmdClass() { 684 return cmdClass; 685 } 686 687 public String getName() { 688 return name; 689 } 690 691 public String getDescription() { 692 return description; 693 } 694 } 695 696 /** 697 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 698 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 699 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 700 * need to add to the clone constructor below copying your new option from the 'that' to the 701 * 'this'. Look for 'clone' below. 702 */ 703 static class TestOptions { 704 String cmdName = null; 705 boolean nomapred = false; 706 boolean filterAll = false; 707 long startRow = 0; 708 float size = 1.0f; 709 long perClientRunRows = DEFAULT_ROWS_PER_GB; 710 int numClientThreads = 1; 711 long totalRows = DEFAULT_ROWS_PER_GB; 712 int measureAfter = 0; 713 float sampleRate = 1.0f; 714 /** 715 * @deprecated Useless after switching to OpenTelemetry 716 */ 717 @Deprecated 718 double traceRate = 0.0; 719 String tableName = TABLE_NAME; 720 boolean flushCommits = true; 721 boolean writeToWAL = true; 722 boolean autoFlush = false; 723 boolean oneCon = false; 724 int connCount = -1; // wil decide the actual num later 725 boolean useTags = false; 726 int noOfTags = 1; 727 boolean reportLatency = false; 728 int multiGet = 0; 729 int multiPut = 0; 730 int randomSleep = 0; 731 boolean inMemoryCF = false; 732 int presplitRegions = 0; 733 int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION; 734 String splitPolicy = null; 735 Compression.Algorithm compression = Compression.Algorithm.NONE; 736 String encryption = null; 737 BloomType bloomType = BloomType.ROW; 738 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 739 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 740 boolean valueRandom = false; 741 boolean valueZipf = false; 742 int valueSize = DEFAULT_VALUE_LENGTH; 743 long period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 744 int cycles = 1; 745 int columns = 1; 746 int families = 1; 747 int caching = 30; 748 int latencyThreshold = 0; // in millsecond 749 boolean addColumns = true; 750 MemoryCompactionPolicy inMemoryCompaction = 751 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 752 boolean asyncPrefetch = false; 753 boolean cacheBlocks = true; 754 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 755 long bufferSize = 2l * 1024l * 1024l; 756 Properties commandProperties; 757 758 public TestOptions() { 759 } 760 761 /** 762 * Clone constructor. 763 * @param that Object to copy from. 764 */ 765 public TestOptions(TestOptions that) { 766 this.cmdName = that.cmdName; 767 this.cycles = that.cycles; 768 this.nomapred = that.nomapred; 769 this.startRow = that.startRow; 770 this.size = that.size; 771 this.perClientRunRows = that.perClientRunRows; 772 this.numClientThreads = that.numClientThreads; 773 this.totalRows = that.totalRows; 774 this.sampleRate = that.sampleRate; 775 this.traceRate = that.traceRate; 776 this.tableName = that.tableName; 777 this.flushCommits = that.flushCommits; 778 this.writeToWAL = that.writeToWAL; 779 this.autoFlush = that.autoFlush; 780 this.oneCon = that.oneCon; 781 this.connCount = that.connCount; 782 this.useTags = that.useTags; 783 this.noOfTags = that.noOfTags; 784 this.reportLatency = that.reportLatency; 785 this.latencyThreshold = that.latencyThreshold; 786 this.multiGet = that.multiGet; 787 this.multiPut = that.multiPut; 788 this.inMemoryCF = that.inMemoryCF; 789 this.presplitRegions = that.presplitRegions; 790 this.replicas = that.replicas; 791 this.splitPolicy = that.splitPolicy; 792 this.compression = that.compression; 793 this.encryption = that.encryption; 794 this.blockEncoding = that.blockEncoding; 795 this.filterAll = that.filterAll; 796 this.bloomType = that.bloomType; 797 this.blockSize = that.blockSize; 798 this.valueRandom = that.valueRandom; 799 this.valueZipf = that.valueZipf; 800 this.valueSize = that.valueSize; 801 this.period = that.period; 802 this.randomSleep = that.randomSleep; 803 this.measureAfter = that.measureAfter; 804 this.addColumns = that.addColumns; 805 this.columns = that.columns; 806 this.families = that.families; 807 this.caching = that.caching; 808 this.inMemoryCompaction = that.inMemoryCompaction; 809 this.asyncPrefetch = that.asyncPrefetch; 810 this.cacheBlocks = that.cacheBlocks; 811 this.scanReadType = that.scanReadType; 812 this.bufferSize = that.bufferSize; 813 this.commandProperties = that.commandProperties; 814 } 815 816 public Properties getCommandProperties() { 817 return commandProperties; 818 } 819 820 public int getCaching() { 821 return this.caching; 822 } 823 824 public void setCaching(final int caching) { 825 this.caching = caching; 826 } 827 828 public int getColumns() { 829 return this.columns; 830 } 831 832 public void setColumns(final int columns) { 833 this.columns = columns; 834 } 835 836 public int getFamilies() { 837 return this.families; 838 } 839 840 public void setFamilies(final int families) { 841 this.families = families; 842 } 843 844 public int getCycles() { 845 return this.cycles; 846 } 847 848 public void setCycles(final int cycles) { 849 this.cycles = cycles; 850 } 851 852 public boolean isValueZipf() { 853 return valueZipf; 854 } 855 856 public void setValueZipf(boolean valueZipf) { 857 this.valueZipf = valueZipf; 858 } 859 860 public String getCmdName() { 861 return cmdName; 862 } 863 864 public void setCmdName(String cmdName) { 865 this.cmdName = cmdName; 866 } 867 868 public int getRandomSleep() { 869 return randomSleep; 870 } 871 872 public void setRandomSleep(int randomSleep) { 873 this.randomSleep = randomSleep; 874 } 875 876 public int getReplicas() { 877 return replicas; 878 } 879 880 public void setReplicas(int replicas) { 881 this.replicas = replicas; 882 } 883 884 public String getSplitPolicy() { 885 return splitPolicy; 886 } 887 888 public void setSplitPolicy(String splitPolicy) { 889 this.splitPolicy = splitPolicy; 890 } 891 892 public void setNomapred(boolean nomapred) { 893 this.nomapred = nomapred; 894 } 895 896 public void setFilterAll(boolean filterAll) { 897 this.filterAll = filterAll; 898 } 899 900 public void setStartRow(long startRow) { 901 this.startRow = startRow; 902 } 903 904 public void setSize(float size) { 905 this.size = size; 906 } 907 908 public void setPerClientRunRows(int perClientRunRows) { 909 this.perClientRunRows = perClientRunRows; 910 } 911 912 public void setNumClientThreads(int numClientThreads) { 913 this.numClientThreads = numClientThreads; 914 } 915 916 public void setTotalRows(long totalRows) { 917 this.totalRows = totalRows; 918 } 919 920 public void setSampleRate(float sampleRate) { 921 this.sampleRate = sampleRate; 922 } 923 924 public void setTraceRate(double traceRate) { 925 this.traceRate = traceRate; 926 } 927 928 public void setTableName(String tableName) { 929 this.tableName = tableName; 930 } 931 932 public void setFlushCommits(boolean flushCommits) { 933 this.flushCommits = flushCommits; 934 } 935 936 public void setWriteToWAL(boolean writeToWAL) { 937 this.writeToWAL = writeToWAL; 938 } 939 940 public void setAutoFlush(boolean autoFlush) { 941 this.autoFlush = autoFlush; 942 } 943 944 public void setOneCon(boolean oneCon) { 945 this.oneCon = oneCon; 946 } 947 948 public int getConnCount() { 949 return connCount; 950 } 951 952 public void setConnCount(int connCount) { 953 this.connCount = connCount; 954 } 955 956 public void setUseTags(boolean useTags) { 957 this.useTags = useTags; 958 } 959 960 public void setNoOfTags(int noOfTags) { 961 this.noOfTags = noOfTags; 962 } 963 964 public void setReportLatency(boolean reportLatency) { 965 this.reportLatency = reportLatency; 966 } 967 968 public void setMultiGet(int multiGet) { 969 this.multiGet = multiGet; 970 } 971 972 public void setMultiPut(int multiPut) { 973 this.multiPut = multiPut; 974 } 975 976 public void setInMemoryCF(boolean inMemoryCF) { 977 this.inMemoryCF = inMemoryCF; 978 } 979 980 public void setPresplitRegions(int presplitRegions) { 981 this.presplitRegions = presplitRegions; 982 } 983 984 public void setCompression(Compression.Algorithm compression) { 985 this.compression = compression; 986 } 987 988 public void setEncryption(String encryption) { 989 this.encryption = encryption; 990 } 991 992 public void setBloomType(BloomType bloomType) { 993 this.bloomType = bloomType; 994 } 995 996 public void setBlockSize(int blockSize) { 997 this.blockSize = blockSize; 998 } 999 1000 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 1001 this.blockEncoding = blockEncoding; 1002 } 1003 1004 public void setValueRandom(boolean valueRandom) { 1005 this.valueRandom = valueRandom; 1006 } 1007 1008 public void setValueSize(int valueSize) { 1009 this.valueSize = valueSize; 1010 } 1011 1012 public void setBufferSize(long bufferSize) { 1013 this.bufferSize = bufferSize; 1014 } 1015 1016 public void setPeriod(int period) { 1017 this.period = period; 1018 } 1019 1020 public boolean isNomapred() { 1021 return nomapred; 1022 } 1023 1024 public boolean isFilterAll() { 1025 return filterAll; 1026 } 1027 1028 public long getStartRow() { 1029 return startRow; 1030 } 1031 1032 public float getSize() { 1033 return size; 1034 } 1035 1036 public long getPerClientRunRows() { 1037 return perClientRunRows; 1038 } 1039 1040 public int getNumClientThreads() { 1041 return numClientThreads; 1042 } 1043 1044 public long getTotalRows() { 1045 return totalRows; 1046 } 1047 1048 public float getSampleRate() { 1049 return sampleRate; 1050 } 1051 1052 public double getTraceRate() { 1053 return traceRate; 1054 } 1055 1056 public String getTableName() { 1057 return tableName; 1058 } 1059 1060 public boolean isFlushCommits() { 1061 return flushCommits; 1062 } 1063 1064 public boolean isWriteToWAL() { 1065 return writeToWAL; 1066 } 1067 1068 public boolean isAutoFlush() { 1069 return autoFlush; 1070 } 1071 1072 public boolean isUseTags() { 1073 return useTags; 1074 } 1075 1076 public int getNoOfTags() { 1077 return noOfTags; 1078 } 1079 1080 public boolean isReportLatency() { 1081 return reportLatency; 1082 } 1083 1084 public int getMultiGet() { 1085 return multiGet; 1086 } 1087 1088 public int getMultiPut() { 1089 return multiPut; 1090 } 1091 1092 public boolean isInMemoryCF() { 1093 return inMemoryCF; 1094 } 1095 1096 public int getPresplitRegions() { 1097 return presplitRegions; 1098 } 1099 1100 public Compression.Algorithm getCompression() { 1101 return compression; 1102 } 1103 1104 public String getEncryption() { 1105 return encryption; 1106 } 1107 1108 public DataBlockEncoding getBlockEncoding() { 1109 return blockEncoding; 1110 } 1111 1112 public boolean isValueRandom() { 1113 return valueRandom; 1114 } 1115 1116 public int getValueSize() { 1117 return valueSize; 1118 } 1119 1120 public long getPeriod() { 1121 return period; 1122 } 1123 1124 public BloomType getBloomType() { 1125 return bloomType; 1126 } 1127 1128 public int getBlockSize() { 1129 return blockSize; 1130 } 1131 1132 public boolean isOneCon() { 1133 return oneCon; 1134 } 1135 1136 public int getMeasureAfter() { 1137 return measureAfter; 1138 } 1139 1140 public void setMeasureAfter(int measureAfter) { 1141 this.measureAfter = measureAfter; 1142 } 1143 1144 public boolean getAddColumns() { 1145 return addColumns; 1146 } 1147 1148 public void setAddColumns(boolean addColumns) { 1149 this.addColumns = addColumns; 1150 } 1151 1152 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1153 this.inMemoryCompaction = inMemoryCompaction; 1154 } 1155 1156 public MemoryCompactionPolicy getInMemoryCompaction() { 1157 return this.inMemoryCompaction; 1158 } 1159 1160 public long getBufferSize() { 1161 return this.bufferSize; 1162 } 1163 } 1164 1165 /* 1166 * A test. Subclass to particularize what happens per row. 1167 */ 1168 static abstract class TestBase { 1169 private final long everyN; 1170 1171 protected final Configuration conf; 1172 protected final TestOptions opts; 1173 1174 protected final Status status; 1175 1176 private String testName; 1177 protected Histogram latencyHistogram; 1178 private Histogram replicaLatencyHistogram; 1179 private Histogram valueSizeHistogram; 1180 private Histogram rpcCallsHistogram; 1181 private Histogram remoteRpcCallsHistogram; 1182 private Histogram millisBetweenNextHistogram; 1183 private Histogram regionsScannedHistogram; 1184 private Histogram bytesInResultsHistogram; 1185 private Histogram bytesInRemoteResultsHistogram; 1186 private RandomDistribution.Zipf zipf; 1187 private long numOfReplyOverLatencyThreshold = 0; 1188 private long numOfReplyFromReplica = 0; 1189 1190 /** 1191 * Note that all subclasses of this class must provide a public constructor that has the exact 1192 * same list of arguments. 1193 */ 1194 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1195 this.conf = conf; 1196 this.opts = options; 1197 this.status = status; 1198 this.testName = this.getClass().getSimpleName(); 1199 everyN = (long) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1200 if (options.isValueZipf()) { 1201 this.zipf = 1202 new RandomDistribution.Zipf(ThreadLocalRandom.current(), 1, options.getValueSize(), 1.2); 1203 } 1204 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1205 } 1206 1207 int getValueLength() { 1208 if (this.opts.isValueRandom()) { 1209 return ThreadLocalRandom.current().nextInt(opts.valueSize); 1210 } else if (this.opts.isValueZipf()) { 1211 return Math.abs(this.zipf.nextInt()); 1212 } else { 1213 return opts.valueSize; 1214 } 1215 } 1216 1217 void updateValueSize(final Result[] rs) throws IOException { 1218 updateValueSize(rs, 0); 1219 } 1220 1221 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1222 if (rs == null || (latency == 0)) return; 1223 for (Result r : rs) 1224 updateValueSize(r, latency); 1225 } 1226 1227 void updateValueSize(final Result r) throws IOException { 1228 updateValueSize(r, 0); 1229 } 1230 1231 void updateValueSize(final Result r, final long latency) throws IOException { 1232 if (r == null || (latency == 0)) return; 1233 int size = 0; 1234 // update replicaHistogram 1235 if (r.isStale()) { 1236 replicaLatencyHistogram.update(latency / 1000); 1237 numOfReplyFromReplica++; 1238 } 1239 if (!isRandomValueSize()) return; 1240 1241 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1242 size += scanner.current().getValueLength(); 1243 } 1244 updateValueSize(size); 1245 } 1246 1247 void updateValueSize(final int valueSize) { 1248 if (!isRandomValueSize()) return; 1249 this.valueSizeHistogram.update(valueSize); 1250 } 1251 1252 void updateScanMetrics(final ScanMetrics metrics) { 1253 if (metrics == null) return; 1254 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1255 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1256 if (rpcCalls != null) { 1257 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1258 } 1259 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1260 if (remoteRpcCalls != null) { 1261 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1262 } 1263 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1264 if (millisBetweenNext != null) { 1265 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1266 } 1267 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1268 if (regionsScanned != null) { 1269 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1270 } 1271 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1272 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1273 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1274 } 1275 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1276 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1277 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1278 } 1279 } 1280 1281 String generateStatus(final long sr, final long i, final long lr) { 1282 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1283 + getShortLatencyReport() + "]" 1284 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1285 } 1286 1287 boolean isRandomValueSize() { 1288 return opts.valueRandom; 1289 } 1290 1291 protected long getReportingPeriod() { 1292 return opts.period; 1293 } 1294 1295 /** 1296 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1297 */ 1298 public Histogram getLatencyHistogram() { 1299 return latencyHistogram; 1300 } 1301 1302 void testSetup() throws IOException { 1303 // test metrics 1304 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1305 // If it is a replica test, set up histogram for replica. 1306 if (opts.replicas > 1) { 1307 replicaLatencyHistogram = 1308 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1309 } 1310 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1311 // scan metrics 1312 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1313 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1314 millisBetweenNextHistogram = 1315 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1316 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1317 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1318 bytesInRemoteResultsHistogram = 1319 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1320 1321 onStartup(); 1322 } 1323 1324 abstract void onStartup() throws IOException; 1325 1326 void testTakedown() throws IOException { 1327 onTakedown(); 1328 // Print all stats for this thread continuously. 1329 // Synchronize on Test.class so different threads don't intermingle the 1330 // output. We can't use 'this' here because each thread has its own instance of Test class. 1331 synchronized (Test.class) { 1332 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1333 status 1334 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1335 if (opts.replicas > 1) { 1336 status.setStatus("Latency (us) from Replica Regions: " 1337 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1338 } 1339 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1340 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1341 if (valueSizeHistogram.getCount() > 0) { 1342 status.setStatus( 1343 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1344 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1345 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1346 } else { 1347 status.setStatus("No valueSize statistics available"); 1348 } 1349 if (rpcCallsHistogram.getCount() > 0) { 1350 status.setStatus( 1351 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1352 } 1353 if (remoteRpcCallsHistogram.getCount() > 0) { 1354 status.setStatus("remoteRpcCalls (count): " 1355 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1356 } 1357 if (millisBetweenNextHistogram.getCount() > 0) { 1358 status.setStatus("millisBetweenNext (latency): " 1359 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1360 } 1361 if (regionsScannedHistogram.getCount() > 0) { 1362 status.setStatus("regionsScanned (count): " 1363 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1364 } 1365 if (bytesInResultsHistogram.getCount() > 0) { 1366 status.setStatus("bytesInResults (size): " 1367 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1368 } 1369 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1370 status.setStatus("bytesInRemoteResults (size): " 1371 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1372 } 1373 } 1374 } 1375 1376 abstract void onTakedown() throws IOException; 1377 1378 /* 1379 * Run test 1380 * @return Elapsed time. 1381 */ 1382 long test() throws IOException, InterruptedException { 1383 testSetup(); 1384 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1385 final long startTime = System.nanoTime(); 1386 try { 1387 testTimed(); 1388 } finally { 1389 testTakedown(); 1390 } 1391 return (System.nanoTime() - startTime) / 1000000; 1392 } 1393 1394 long getStartRow() { 1395 return opts.startRow; 1396 } 1397 1398 long getLastRow() { 1399 return getStartRow() + opts.perClientRunRows; 1400 } 1401 1402 /** 1403 * Provides an extension point for tests that don't want a per row invocation. 1404 */ 1405 void testTimed() throws IOException, InterruptedException { 1406 long startRow = getStartRow(); 1407 long lastRow = getLastRow(); 1408 // Report on completion of 1/10th of total. 1409 for (int ii = 0; ii < opts.cycles; ii++) { 1410 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1411 for (long i = startRow; i < lastRow; i++) { 1412 if (i % everyN != 0) continue; 1413 long startTime = System.nanoTime(); 1414 boolean requestSent = false; 1415 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1416 try (Scope scope = span.makeCurrent()) { 1417 requestSent = testRow(i, startTime); 1418 } finally { 1419 span.end(); 1420 } 1421 if ((i - startRow) > opts.measureAfter) { 1422 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1423 // first 9 times and sends the actual get request in the 10th iteration. 1424 // We should only set latency when actual request is sent because otherwise 1425 // it turns out to be 0. 1426 if (requestSent) { 1427 long latency = (System.nanoTime() - startTime) / 1000; 1428 latencyHistogram.update(latency); 1429 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1430 numOfReplyOverLatencyThreshold++; 1431 } 1432 } 1433 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1434 status.setStatus(generateStatus(startRow, i, lastRow)); 1435 } 1436 } 1437 } 1438 } 1439 } 1440 1441 /** Returns Subset of the histograms' calculation. */ 1442 public String getShortLatencyReport() { 1443 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1444 } 1445 1446 /** Returns Subset of the histograms' calculation. */ 1447 public String getShortValueSizeReport() { 1448 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1449 } 1450 1451 /** 1452 * Test for individual row. 1453 * @param i Row index. 1454 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1455 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1456 */ 1457 abstract boolean testRow(final long i, final long startTime) 1458 throws IOException, InterruptedException; 1459 } 1460 1461 static abstract class Test extends TestBase { 1462 protected Connection connection; 1463 1464 Test(final Connection con, final TestOptions options, final Status status) { 1465 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1466 this.connection = con; 1467 } 1468 } 1469 1470 static abstract class AsyncTest extends TestBase { 1471 protected AsyncConnection connection; 1472 1473 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1474 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1475 this.connection = con; 1476 } 1477 } 1478 1479 static abstract class TableTest extends Test { 1480 protected Table table; 1481 1482 TableTest(Connection con, TestOptions options, Status status) { 1483 super(con, options, status); 1484 } 1485 1486 @Override 1487 void onStartup() throws IOException { 1488 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1489 } 1490 1491 @Override 1492 void onTakedown() throws IOException { 1493 table.close(); 1494 } 1495 } 1496 1497 /* 1498 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1499 */ 1500 static abstract class MetaTest extends TableTest { 1501 protected int keyLength; 1502 1503 MetaTest(Connection con, TestOptions options, Status status) { 1504 super(con, options, status); 1505 keyLength = Long.toString(opts.perClientRunRows).length(); 1506 } 1507 1508 @Override 1509 void onTakedown() throws IOException { 1510 // No clean up 1511 } 1512 1513 /* 1514 * Generates Lexicographically ascending strings 1515 */ 1516 protected byte[] getSplitKey(final long i) { 1517 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1518 } 1519 1520 } 1521 1522 static abstract class AsyncTableTest extends AsyncTest { 1523 protected AsyncTable<?> table; 1524 1525 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1526 super(con, options, status); 1527 } 1528 1529 @Override 1530 void onStartup() throws IOException { 1531 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1532 } 1533 1534 @Override 1535 void onTakedown() throws IOException { 1536 } 1537 } 1538 1539 static class AsyncRandomReadTest extends AsyncTableTest { 1540 private final Consistency consistency; 1541 private ArrayList<Get> gets; 1542 1543 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1544 super(con, options, status); 1545 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1546 if (opts.multiGet > 0) { 1547 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1548 this.gets = new ArrayList<>(opts.multiGet); 1549 } 1550 } 1551 1552 @Override 1553 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 1554 if (opts.randomSleep > 0) { 1555 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1556 } 1557 Get get = new Get(getRandomRow(opts.totalRows)); 1558 for (int family = 0; family < opts.families; family++) { 1559 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1560 if (opts.addColumns) { 1561 for (int column = 0; column < opts.columns; column++) { 1562 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1563 get.addColumn(familyName, qualifier); 1564 } 1565 } else { 1566 get.addFamily(familyName); 1567 } 1568 } 1569 if (opts.filterAll) { 1570 get.setFilter(new FilterAllFilter()); 1571 } 1572 get.setConsistency(consistency); 1573 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1574 try { 1575 if (opts.multiGet > 0) { 1576 this.gets.add(get); 1577 if (this.gets.size() == opts.multiGet) { 1578 Result[] rs = 1579 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1580 updateValueSize(rs); 1581 this.gets.clear(); 1582 } else { 1583 return false; 1584 } 1585 } else { 1586 updateValueSize(this.table.get(get).get()); 1587 } 1588 } catch (ExecutionException e) { 1589 throw new IOException(e); 1590 } 1591 return true; 1592 } 1593 1594 public static RuntimeException runtime(Throwable e) { 1595 if (e instanceof RuntimeException) { 1596 return (RuntimeException) e; 1597 } 1598 return new RuntimeException(e); 1599 } 1600 1601 public static <V> V propagate(Callable<V> callable) { 1602 try { 1603 return callable.call(); 1604 } catch (Exception e) { 1605 throw runtime(e); 1606 } 1607 } 1608 1609 @Override 1610 protected long getReportingPeriod() { 1611 long period = opts.perClientRunRows / 10; 1612 return period == 0 ? opts.perClientRunRows : period; 1613 } 1614 1615 @Override 1616 protected void testTakedown() throws IOException { 1617 if (this.gets != null && this.gets.size() > 0) { 1618 this.table.get(gets); 1619 this.gets.clear(); 1620 } 1621 super.testTakedown(); 1622 } 1623 } 1624 1625 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1626 1627 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1628 super(con, options, status); 1629 } 1630 1631 @Override 1632 protected byte[] generateRow(final long i) { 1633 return getRandomRow(opts.totalRows); 1634 } 1635 } 1636 1637 static class AsyncScanTest extends AsyncTableTest { 1638 private ResultScanner testScanner; 1639 private AsyncTable<?> asyncTable; 1640 1641 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1642 super(con, options, status); 1643 } 1644 1645 @Override 1646 void onStartup() throws IOException { 1647 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1648 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1649 } 1650 1651 @Override 1652 void testTakedown() throws IOException { 1653 if (this.testScanner != null) { 1654 updateScanMetrics(this.testScanner.getScanMetrics()); 1655 this.testScanner.close(); 1656 } 1657 super.testTakedown(); 1658 } 1659 1660 @Override 1661 boolean testRow(final long i, final long startTime) throws IOException { 1662 if (this.testScanner == null) { 1663 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1664 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1665 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1666 for (int family = 0; family < opts.families; family++) { 1667 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1668 if (opts.addColumns) { 1669 for (int column = 0; column < opts.columns; column++) { 1670 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1671 scan.addColumn(familyName, qualifier); 1672 } 1673 } else { 1674 scan.addFamily(familyName); 1675 } 1676 } 1677 if (opts.filterAll) { 1678 scan.setFilter(new FilterAllFilter()); 1679 } 1680 this.testScanner = asyncTable.getScanner(scan); 1681 } 1682 Result r = testScanner.next(); 1683 updateValueSize(r); 1684 return true; 1685 } 1686 } 1687 1688 static class AsyncSequentialReadTest extends AsyncTableTest { 1689 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1690 super(con, options, status); 1691 } 1692 1693 @Override 1694 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 1695 Get get = new Get(format(i)); 1696 for (int family = 0; family < opts.families; family++) { 1697 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1698 if (opts.addColumns) { 1699 for (int column = 0; column < opts.columns; column++) { 1700 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1701 get.addColumn(familyName, qualifier); 1702 } 1703 } else { 1704 get.addFamily(familyName); 1705 } 1706 } 1707 if (opts.filterAll) { 1708 get.setFilter(new FilterAllFilter()); 1709 } 1710 try { 1711 updateValueSize(table.get(get).get()); 1712 } catch (ExecutionException e) { 1713 throw new IOException(e); 1714 } 1715 return true; 1716 } 1717 } 1718 1719 static class AsyncSequentialWriteTest extends AsyncTableTest { 1720 private ArrayList<Put> puts; 1721 1722 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1723 super(con, options, status); 1724 if (opts.multiPut > 0) { 1725 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1726 this.puts = new ArrayList<>(opts.multiPut); 1727 } 1728 } 1729 1730 protected byte[] generateRow(final long i) { 1731 return format(i); 1732 } 1733 1734 @Override 1735 @SuppressWarnings("ReturnValueIgnored") 1736 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 1737 byte[] row = generateRow(i); 1738 Put put = new Put(row); 1739 for (int family = 0; family < opts.families; family++) { 1740 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1741 for (int column = 0; column < opts.columns; column++) { 1742 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1743 byte[] value = generateData(getValueLength()); 1744 if (opts.useTags) { 1745 byte[] tag = generateData(TAG_LENGTH); 1746 Tag[] tags = new Tag[opts.noOfTags]; 1747 for (int n = 0; n < opts.noOfTags; n++) { 1748 Tag t = new ArrayBackedTag((byte) n, tag); 1749 tags[n] = t; 1750 } 1751 KeyValue kv = 1752 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1753 put.add(kv); 1754 updateValueSize(kv.getValueLength()); 1755 } else { 1756 put.addColumn(familyName, qualifier, value); 1757 updateValueSize(value.length); 1758 } 1759 } 1760 } 1761 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1762 try { 1763 table.put(put).get(); 1764 if (opts.multiPut > 0) { 1765 this.puts.add(put); 1766 if (this.puts.size() == opts.multiPut) { 1767 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1768 this.puts.clear(); 1769 } else { 1770 return false; 1771 } 1772 } else { 1773 table.put(put).get(); 1774 } 1775 } catch (ExecutionException e) { 1776 throw new IOException(e); 1777 } 1778 return true; 1779 } 1780 } 1781 1782 static abstract class BufferedMutatorTest extends Test { 1783 protected BufferedMutator mutator; 1784 protected Table table; 1785 1786 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1787 super(con, options, status); 1788 } 1789 1790 @Override 1791 void onStartup() throws IOException { 1792 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1793 p.writeBufferSize(opts.bufferSize); 1794 this.mutator = connection.getBufferedMutator(p); 1795 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1796 } 1797 1798 @Override 1799 void onTakedown() throws IOException { 1800 mutator.close(); 1801 table.close(); 1802 } 1803 } 1804 1805 static class RandomSeekScanTest extends TableTest { 1806 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1807 super(con, options, status); 1808 } 1809 1810 @Override 1811 boolean testRow(final long i, final long startTime) throws IOException { 1812 Scan scan = new Scan().withStartRow(getRandomRow(opts.totalRows)).setCaching(opts.caching) 1813 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1814 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1815 FilterList list = new FilterList(); 1816 for (int family = 0; family < opts.families; family++) { 1817 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1818 if (opts.addColumns) { 1819 for (int column = 0; column < opts.columns; column++) { 1820 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1821 scan.addColumn(familyName, qualifier); 1822 } 1823 } else { 1824 scan.addFamily(familyName); 1825 } 1826 } 1827 if (opts.filterAll) { 1828 list.addFilter(new FilterAllFilter()); 1829 } 1830 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1831 scan.setFilter(list); 1832 ResultScanner s = this.table.getScanner(scan); 1833 try { 1834 for (Result rr; (rr = s.next()) != null;) { 1835 updateValueSize(rr); 1836 } 1837 } finally { 1838 updateScanMetrics(s.getScanMetrics()); 1839 s.close(); 1840 } 1841 return true; 1842 } 1843 1844 @Override 1845 protected long getReportingPeriod() { 1846 long period = opts.perClientRunRows / 100; 1847 return period == 0 ? opts.perClientRunRows : period; 1848 } 1849 1850 } 1851 1852 static abstract class RandomScanWithRangeTest extends TableTest { 1853 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1854 super(con, options, status); 1855 } 1856 1857 @Override 1858 boolean testRow(final long i, final long startTime) throws IOException { 1859 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1860 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1861 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1862 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1863 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1864 for (int family = 0; family < opts.families; family++) { 1865 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1866 if (opts.addColumns) { 1867 for (int column = 0; column < opts.columns; column++) { 1868 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1869 scan.addColumn(familyName, qualifier); 1870 } 1871 } else { 1872 scan.addFamily(familyName); 1873 } 1874 } 1875 if (opts.filterAll) { 1876 scan.setFilter(new FilterAllFilter()); 1877 } 1878 Result r = null; 1879 int count = 0; 1880 ResultScanner s = this.table.getScanner(scan); 1881 try { 1882 for (; (r = s.next()) != null;) { 1883 updateValueSize(r); 1884 count++; 1885 } 1886 if (i % 100 == 0) { 1887 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1888 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1889 count)); 1890 } 1891 } finally { 1892 updateScanMetrics(s.getScanMetrics()); 1893 s.close(); 1894 } 1895 return true; 1896 } 1897 1898 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1899 1900 protected Pair<byte[], byte[]> generateStartAndStopRows(long maxRange) { 1901 long start = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % opts.totalRows; 1902 long stop = start + maxRange; 1903 return new Pair<>(format(start), format(stop)); 1904 } 1905 1906 @Override 1907 protected long getReportingPeriod() { 1908 long period = opts.perClientRunRows / 100; 1909 return period == 0 ? opts.perClientRunRows : period; 1910 } 1911 } 1912 1913 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1914 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1915 super(con, options, status); 1916 } 1917 1918 @Override 1919 protected Pair<byte[], byte[]> getStartAndStopRow() { 1920 return generateStartAndStopRows(10); 1921 } 1922 } 1923 1924 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1925 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1926 super(con, options, status); 1927 } 1928 1929 @Override 1930 protected Pair<byte[], byte[]> getStartAndStopRow() { 1931 return generateStartAndStopRows(100); 1932 } 1933 } 1934 1935 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1936 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1937 super(con, options, status); 1938 } 1939 1940 @Override 1941 protected Pair<byte[], byte[]> getStartAndStopRow() { 1942 return generateStartAndStopRows(1000); 1943 } 1944 } 1945 1946 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1947 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1948 super(con, options, status); 1949 } 1950 1951 @Override 1952 protected Pair<byte[], byte[]> getStartAndStopRow() { 1953 return generateStartAndStopRows(10000); 1954 } 1955 } 1956 1957 static class RandomReadTest extends TableTest { 1958 private final Consistency consistency; 1959 private ArrayList<Get> gets; 1960 1961 RandomReadTest(Connection con, TestOptions options, Status status) { 1962 super(con, options, status); 1963 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1964 if (opts.multiGet > 0) { 1965 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1966 this.gets = new ArrayList<>(opts.multiGet); 1967 } 1968 } 1969 1970 @Override 1971 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 1972 if (opts.randomSleep > 0) { 1973 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1974 } 1975 Get get = new Get(getRandomRow(opts.totalRows)); 1976 for (int family = 0; family < opts.families; family++) { 1977 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1978 if (opts.addColumns) { 1979 for (int column = 0; column < opts.columns; column++) { 1980 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1981 get.addColumn(familyName, qualifier); 1982 } 1983 } else { 1984 get.addFamily(familyName); 1985 } 1986 } 1987 if (opts.filterAll) { 1988 get.setFilter(new FilterAllFilter()); 1989 } 1990 get.setConsistency(consistency); 1991 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1992 if (opts.multiGet > 0) { 1993 this.gets.add(get); 1994 if (this.gets.size() == opts.multiGet) { 1995 Result[] rs = this.table.get(this.gets); 1996 if (opts.replicas > 1) { 1997 long latency = System.nanoTime() - startTime; 1998 updateValueSize(rs, latency); 1999 } else { 2000 updateValueSize(rs); 2001 } 2002 this.gets.clear(); 2003 } else { 2004 return false; 2005 } 2006 } else { 2007 if (opts.replicas > 1) { 2008 Result r = this.table.get(get); 2009 long latency = System.nanoTime() - startTime; 2010 updateValueSize(r, latency); 2011 } else { 2012 updateValueSize(this.table.get(get)); 2013 } 2014 } 2015 return true; 2016 } 2017 2018 @Override 2019 protected long getReportingPeriod() { 2020 long period = opts.perClientRunRows / 10; 2021 return period == 0 ? opts.perClientRunRows : period; 2022 } 2023 2024 @Override 2025 protected void testTakedown() throws IOException { 2026 if (this.gets != null && this.gets.size() > 0) { 2027 this.table.get(gets); 2028 this.gets.clear(); 2029 } 2030 super.testTakedown(); 2031 } 2032 } 2033 2034 /* 2035 * Send random reads against fake regions inserted by MetaWriteTest 2036 */ 2037 static class MetaRandomReadTest extends MetaTest { 2038 private RegionLocator regionLocator; 2039 2040 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2041 super(con, options, status); 2042 LOG.info("call getRegionLocation"); 2043 } 2044 2045 @Override 2046 void onStartup() throws IOException { 2047 super.onStartup(); 2048 this.regionLocator = connection.getRegionLocator(table.getName()); 2049 } 2050 2051 @Override 2052 boolean testRow(final long i, final long startTime) throws IOException, InterruptedException { 2053 if (opts.randomSleep > 0) { 2054 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 2055 } 2056 HRegionLocation hRegionLocation = regionLocator.getRegionLocation( 2057 getSplitKey(ThreadLocalRandom.current().nextLong(opts.perClientRunRows)), true); 2058 LOG.debug("get location for region: " + hRegionLocation); 2059 return true; 2060 } 2061 2062 @Override 2063 protected long getReportingPeriod() { 2064 long period = opts.perClientRunRows / 10; 2065 return period == 0 ? opts.perClientRunRows : period; 2066 } 2067 2068 @Override 2069 protected void testTakedown() throws IOException { 2070 super.testTakedown(); 2071 } 2072 } 2073 2074 static class RandomWriteTest extends SequentialWriteTest { 2075 RandomWriteTest(Connection con, TestOptions options, Status status) { 2076 super(con, options, status); 2077 } 2078 2079 @Override 2080 protected byte[] generateRow(final long i) { 2081 return getRandomRow(opts.totalRows); 2082 } 2083 2084 } 2085 2086 static class RandomDeleteTest extends SequentialDeleteTest { 2087 RandomDeleteTest(Connection con, TestOptions options, Status status) { 2088 super(con, options, status); 2089 } 2090 2091 @Override 2092 protected byte[] generateRow(final long i) { 2093 return getRandomRow(opts.totalRows); 2094 } 2095 2096 } 2097 2098 static class ScanTest extends TableTest { 2099 private ResultScanner testScanner; 2100 2101 ScanTest(Connection con, TestOptions options, Status status) { 2102 super(con, options, status); 2103 } 2104 2105 @Override 2106 void testTakedown() throws IOException { 2107 if (this.testScanner != null) { 2108 this.testScanner.close(); 2109 } 2110 super.testTakedown(); 2111 } 2112 2113 @Override 2114 boolean testRow(final long i, final long startTime) throws IOException { 2115 if (this.testScanner == null) { 2116 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2117 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2118 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2119 for (int family = 0; family < opts.families; family++) { 2120 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2121 if (opts.addColumns) { 2122 for (int column = 0; column < opts.columns; column++) { 2123 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2124 scan.addColumn(familyName, qualifier); 2125 } 2126 } else { 2127 scan.addFamily(familyName); 2128 } 2129 } 2130 if (opts.filterAll) { 2131 scan.setFilter(new FilterAllFilter()); 2132 } 2133 this.testScanner = table.getScanner(scan); 2134 } 2135 Result r = testScanner.next(); 2136 updateValueSize(r); 2137 return true; 2138 } 2139 } 2140 2141 static class ReverseScanTest extends TableTest { 2142 private ResultScanner testScanner; 2143 2144 ReverseScanTest(Connection con, TestOptions options, Status status) { 2145 super(con, options, status); 2146 } 2147 2148 @Override 2149 void testTakedown() throws IOException { 2150 if (this.testScanner != null) { 2151 this.testScanner.close(); 2152 } 2153 super.testTakedown(); 2154 } 2155 2156 @Override 2157 boolean testRow(final long i, final long startTime) throws IOException { 2158 if (this.testScanner == null) { 2159 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2160 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2161 .setScanMetricsEnabled(true).setReversed(true); 2162 for (int family = 0; family < opts.families; family++) { 2163 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2164 if (opts.addColumns) { 2165 for (int column = 0; column < opts.columns; column++) { 2166 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2167 scan.addColumn(familyName, qualifier); 2168 } 2169 } else { 2170 scan.addFamily(familyName); 2171 } 2172 } 2173 if (opts.filterAll) { 2174 scan.setFilter(new FilterAllFilter()); 2175 } 2176 this.testScanner = table.getScanner(scan); 2177 } 2178 Result r = testScanner.next(); 2179 updateValueSize(r); 2180 return true; 2181 } 2182 } 2183 2184 /** 2185 * Base class for operations that are CAS-like; that read a value and then set it based off what 2186 * they read. In this category is increment, append, checkAndPut, etc. 2187 * <p> 2188 * These operations also want some concurrency going on. Usually when these tests run, they 2189 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2190 * same key space. We do this with our getStartRow and getLastRow overrides. 2191 */ 2192 static abstract class CASTableTest extends TableTest { 2193 private final byte[] qualifier; 2194 2195 CASTableTest(Connection con, TestOptions options, Status status) { 2196 super(con, options, status); 2197 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2198 } 2199 2200 byte[] getQualifier() { 2201 return this.qualifier; 2202 } 2203 2204 @Override 2205 long getStartRow() { 2206 return 0; 2207 } 2208 2209 @Override 2210 long getLastRow() { 2211 return opts.perClientRunRows; 2212 } 2213 } 2214 2215 static class IncrementTest extends CASTableTest { 2216 IncrementTest(Connection con, TestOptions options, Status status) { 2217 super(con, options, status); 2218 } 2219 2220 @Override 2221 boolean testRow(final long i, final long startTime) throws IOException { 2222 Increment increment = new Increment(format(i)); 2223 // unlike checkAndXXX tests, which make most sense to do on a single value, 2224 // if multiple families are specified for an increment test we assume it is 2225 // meant to raise the work factor 2226 for (int family = 0; family < opts.families; family++) { 2227 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2228 increment.addColumn(familyName, getQualifier(), 1l); 2229 } 2230 updateValueSize(this.table.increment(increment)); 2231 return true; 2232 } 2233 } 2234 2235 static class AppendTest extends CASTableTest { 2236 AppendTest(Connection con, TestOptions options, Status status) { 2237 super(con, options, status); 2238 } 2239 2240 @Override 2241 boolean testRow(final long i, final long startTime) throws IOException { 2242 byte[] bytes = format(i); 2243 Append append = new Append(bytes); 2244 // unlike checkAndXXX tests, which make most sense to do on a single value, 2245 // if multiple families are specified for an append test we assume it is 2246 // meant to raise the work factor 2247 for (int family = 0; family < opts.families; family++) { 2248 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2249 append.addColumn(familyName, getQualifier(), bytes); 2250 } 2251 updateValueSize(this.table.append(append)); 2252 return true; 2253 } 2254 } 2255 2256 static class CheckAndMutateTest extends CASTableTest { 2257 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2258 super(con, options, status); 2259 } 2260 2261 @Override 2262 boolean testRow(final long i, final long startTime) throws IOException { 2263 final byte[] bytes = format(i); 2264 // checkAndXXX tests operate on only a single value 2265 // Put a known value so when we go to check it, it is there. 2266 Put put = new Put(bytes); 2267 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2268 this.table.put(put); 2269 RowMutations mutations = new RowMutations(bytes); 2270 mutations.add(put); 2271 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2272 .thenMutate(mutations); 2273 return true; 2274 } 2275 } 2276 2277 static class CheckAndPutTest extends CASTableTest { 2278 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2279 super(con, options, status); 2280 } 2281 2282 @Override 2283 boolean testRow(final long i, final long startTime) throws IOException { 2284 final byte[] bytes = format(i); 2285 // checkAndXXX tests operate on only a single value 2286 // Put a known value so when we go to check it, it is there. 2287 Put put = new Put(bytes); 2288 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2289 this.table.put(put); 2290 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2291 .thenPut(put); 2292 return true; 2293 } 2294 } 2295 2296 static class CheckAndDeleteTest extends CASTableTest { 2297 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2298 super(con, options, status); 2299 } 2300 2301 @Override 2302 boolean testRow(final long i, final long startTime) throws IOException { 2303 final byte[] bytes = format(i); 2304 // checkAndXXX tests operate on only a single value 2305 // Put a known value so when we go to check it, it is there. 2306 Put put = new Put(bytes); 2307 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2308 this.table.put(put); 2309 Delete delete = new Delete(put.getRow()); 2310 delete.addColumn(FAMILY_ZERO, getQualifier()); 2311 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2312 .thenDelete(delete); 2313 return true; 2314 } 2315 } 2316 2317 /* 2318 * Delete all fake regions inserted to meta table by MetaWriteTest. 2319 */ 2320 static class CleanMetaTest extends MetaTest { 2321 CleanMetaTest(Connection con, TestOptions options, Status status) { 2322 super(con, options, status); 2323 } 2324 2325 @Override 2326 boolean testRow(final long i, final long startTime) throws IOException { 2327 try { 2328 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2329 .getRegionLocation(getSplitKey(i), false).getRegion(); 2330 LOG.debug("deleting region from meta: " + regionInfo); 2331 2332 Delete delete = 2333 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2334 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2335 t.delete(delete); 2336 } 2337 } catch (IOException ie) { 2338 // Log and continue 2339 LOG.error("cannot find region with start key: " + i); 2340 } 2341 return true; 2342 } 2343 } 2344 2345 static class SequentialReadTest extends TableTest { 2346 SequentialReadTest(Connection con, TestOptions options, Status status) { 2347 super(con, options, status); 2348 } 2349 2350 @Override 2351 boolean testRow(final long i, final long startTime) throws IOException { 2352 Get get = new Get(format(i)); 2353 for (int family = 0; family < opts.families; family++) { 2354 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2355 if (opts.addColumns) { 2356 for (int column = 0; column < opts.columns; column++) { 2357 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2358 get.addColumn(familyName, qualifier); 2359 } 2360 } else { 2361 get.addFamily(familyName); 2362 } 2363 } 2364 if (opts.filterAll) { 2365 get.setFilter(new FilterAllFilter()); 2366 } 2367 updateValueSize(table.get(get)); 2368 return true; 2369 } 2370 } 2371 2372 static class SequentialWriteTest extends BufferedMutatorTest { 2373 private ArrayList<Put> puts; 2374 2375 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2376 super(con, options, status); 2377 if (opts.multiPut > 0) { 2378 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2379 this.puts = new ArrayList<>(opts.multiPut); 2380 } 2381 } 2382 2383 protected byte[] generateRow(final long i) { 2384 return format(i); 2385 } 2386 2387 @Override 2388 boolean testRow(final long i, final long startTime) throws IOException { 2389 byte[] row = generateRow(i); 2390 Put put = new Put(row); 2391 for (int family = 0; family < opts.families; family++) { 2392 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2393 for (int column = 0; column < opts.columns; column++) { 2394 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2395 byte[] value = generateData(getValueLength()); 2396 if (opts.useTags) { 2397 byte[] tag = generateData(TAG_LENGTH); 2398 Tag[] tags = new Tag[opts.noOfTags]; 2399 for (int n = 0; n < opts.noOfTags; n++) { 2400 Tag t = new ArrayBackedTag((byte) n, tag); 2401 tags[n] = t; 2402 } 2403 KeyValue kv = 2404 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2405 put.add(kv); 2406 updateValueSize(kv.getValueLength()); 2407 } else { 2408 put.addColumn(familyName, qualifier, value); 2409 updateValueSize(value.length); 2410 } 2411 } 2412 } 2413 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2414 if (opts.autoFlush) { 2415 if (opts.multiPut > 0) { 2416 this.puts.add(put); 2417 if (this.puts.size() == opts.multiPut) { 2418 table.put(this.puts); 2419 this.puts.clear(); 2420 } else { 2421 return false; 2422 } 2423 } else { 2424 table.put(put); 2425 } 2426 } else { 2427 mutator.mutate(put); 2428 } 2429 return true; 2430 } 2431 } 2432 2433 static class SequentialDeleteTest extends BufferedMutatorTest { 2434 2435 SequentialDeleteTest(Connection con, TestOptions options, Status status) { 2436 super(con, options, status); 2437 } 2438 2439 protected byte[] generateRow(final long i) { 2440 return format(i); 2441 } 2442 2443 @Override 2444 boolean testRow(final long i, final long startTime) throws IOException { 2445 byte[] row = generateRow(i); 2446 Delete delete = new Delete(row); 2447 for (int family = 0; family < opts.families; family++) { 2448 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2449 delete.addFamily(familyName); 2450 } 2451 delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2452 if (opts.autoFlush) { 2453 table.delete(delete); 2454 } else { 2455 mutator.mutate(delete); 2456 } 2457 return true; 2458 } 2459 } 2460 2461 /* 2462 * Insert fake regions into meta table with contiguous split keys. 2463 */ 2464 static class MetaWriteTest extends MetaTest { 2465 2466 MetaWriteTest(Connection con, TestOptions options, Status status) { 2467 super(con, options, status); 2468 } 2469 2470 @Override 2471 boolean testRow(final long i, final long startTime) throws IOException { 2472 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2473 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2474 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2475 regionInfos.add(regionInfo); 2476 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2477 2478 // write the serverName columns 2479 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2480 ServerName.valueOf("localhost", 60010, ThreadLocalRandom.current().nextLong()), i, 2481 EnvironmentEdgeManager.currentTime()); 2482 return true; 2483 } 2484 } 2485 2486 static class FilteredScanTest extends TableTest { 2487 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2488 2489 FilteredScanTest(Connection con, TestOptions options, Status status) { 2490 super(con, options, status); 2491 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2492 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2493 + ". This could take a very long time."); 2494 } 2495 } 2496 2497 @Override 2498 boolean testRow(long i, final long startTime) throws IOException { 2499 byte[] value = generateData(getValueLength()); 2500 Scan scan = constructScan(value); 2501 ResultScanner scanner = null; 2502 try { 2503 scanner = this.table.getScanner(scan); 2504 for (Result r = null; (r = scanner.next()) != null;) { 2505 updateValueSize(r); 2506 } 2507 } finally { 2508 if (scanner != null) { 2509 updateScanMetrics(scanner.getScanMetrics()); 2510 scanner.close(); 2511 } 2512 } 2513 return true; 2514 } 2515 2516 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2517 FilterList list = new FilterList(); 2518 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2519 new BinaryComparator(valuePrefix)); 2520 list.addFilter(filter); 2521 if (opts.filterAll) { 2522 list.addFilter(new FilterAllFilter()); 2523 } 2524 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2525 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2526 .setScanMetricsEnabled(true); 2527 if (opts.addColumns) { 2528 for (int column = 0; column < opts.columns; column++) { 2529 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2530 scan.addColumn(FAMILY_ZERO, qualifier); 2531 } 2532 } else { 2533 scan.addFamily(FAMILY_ZERO); 2534 } 2535 scan.setFilter(list); 2536 return scan; 2537 } 2538 } 2539 2540 /** 2541 * Compute a throughput rate in MB/s. 2542 * @param rows Number of records consumed. 2543 * @param timeMs Time taken in milliseconds. 2544 * @return String value with label, ie '123.76 MB/s' 2545 */ 2546 private static String calculateMbps(long rows, long timeMs, final int valueSize, int families, 2547 int columns) { 2548 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2549 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2550 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2551 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2552 return FMT.format(mbps) + " MB/s"; 2553 } 2554 2555 /* 2556 * Format passed integer. 2557 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does 2558 * absolute in case number is negative). 2559 */ 2560 public static byte[] format(final long number) { 2561 byte[] b = new byte[ROW_LENGTH]; 2562 long d = Math.abs(number); 2563 for (int i = b.length - 1; i >= 0; i--) { 2564 b[i] = (byte) ((d % 10) + '0'); 2565 d /= 10; 2566 } 2567 return b; 2568 } 2569 2570 /* 2571 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2572 * test, generation of the key and value consumes about 30% of CPU time. 2573 * @return Generated random value to insert into a table cell. 2574 */ 2575 public static byte[] generateData(int length) { 2576 byte[] b = new byte[length]; 2577 int i; 2578 2579 Random r = ThreadLocalRandom.current(); 2580 for (i = 0; i < (length - 8); i += 8) { 2581 b[i] = (byte) (65 + r.nextInt(26)); 2582 b[i + 1] = b[i]; 2583 b[i + 2] = b[i]; 2584 b[i + 3] = b[i]; 2585 b[i + 4] = b[i]; 2586 b[i + 5] = b[i]; 2587 b[i + 6] = b[i]; 2588 b[i + 7] = b[i]; 2589 } 2590 2591 byte a = (byte) (65 + r.nextInt(26)); 2592 for (; i < length; i++) { 2593 b[i] = a; 2594 } 2595 return b; 2596 } 2597 2598 static byte[] getRandomRow(final long totalRows) { 2599 return format(generateRandomRow(totalRows)); 2600 } 2601 2602 static long generateRandomRow(final long totalRows) { 2603 return ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % totalRows; 2604 } 2605 2606 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2607 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2608 throws IOException, InterruptedException { 2609 status.setStatus( 2610 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2611 long totalElapsedTime; 2612 2613 final TestBase t; 2614 try { 2615 if (AsyncTest.class.isAssignableFrom(cmd)) { 2616 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2617 Constructor<? extends AsyncTest> constructor = 2618 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2619 t = constructor.newInstance(asyncCon, opts, status); 2620 } else { 2621 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2622 Constructor<? extends Test> constructor = 2623 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2624 t = constructor.newInstance(con, opts, status); 2625 } 2626 } catch (NoSuchMethodException e) { 2627 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2628 + ". It does not provide a constructor as described by " 2629 + "the javadoc comment. Available constructors are: " 2630 + Arrays.toString(cmd.getConstructors())); 2631 } catch (Exception e) { 2632 throw new IllegalStateException("Failed to construct command class", e); 2633 } 2634 totalElapsedTime = t.test(); 2635 2636 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2637 + " for " + opts.perClientRunRows + " rows" + " (" 2638 + calculateMbps((long) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2639 getAverageValueLength(opts), opts.families, opts.columns) 2640 + ")"); 2641 2642 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2643 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2644 } 2645 2646 private static int getAverageValueLength(final TestOptions opts) { 2647 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2648 } 2649 2650 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2651 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2652 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2653 // the TestOptions introspection for us and dump the output in a readable format. 2654 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2655 Admin admin = null; 2656 Connection connection = null; 2657 try { 2658 connection = ConnectionFactory.createConnection(getConf()); 2659 admin = connection.getAdmin(); 2660 checkTable(admin, opts); 2661 } finally { 2662 if (admin != null) admin.close(); 2663 if (connection != null) connection.close(); 2664 } 2665 if (opts.nomapred) { 2666 doLocalClients(opts, getConf()); 2667 } else { 2668 doMapReduce(opts, getConf()); 2669 } 2670 } 2671 2672 protected void printUsage() { 2673 printUsage(PE_COMMAND_SHORTNAME, null); 2674 } 2675 2676 protected static void printUsage(final String message) { 2677 printUsage(PE_COMMAND_SHORTNAME, message); 2678 } 2679 2680 protected static void printUsageAndExit(final String message, final int exitCode) { 2681 printUsage(message); 2682 System.exit(exitCode); 2683 } 2684 2685 protected static void printUsage(final String shortName, final String message) { 2686 if (message != null && message.length() > 0) { 2687 System.err.println(message); 2688 } 2689 System.err.print("Usage: hbase " + shortName); 2690 System.err.println(" <OPTIONS> [-D<property=value>]* <command|class> <nclients>"); 2691 System.err.println(); 2692 System.err.println("General Options:"); 2693 System.err.println( 2694 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2695 System.err 2696 .println(" oneCon all the threads share the same connection. Default: False"); 2697 System.err.println(" connCount connections all threads share. " 2698 + "For example, if set to 2, then all thread share 2 connection. " 2699 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2700 + "if not, connCount=thread number"); 2701 2702 System.err.println(" sampleRate Execute test on a sample of total " 2703 + "rows. Only supported by randomRead. Default: 1.0"); 2704 System.err.println(" period Report every 'period' rows: " 2705 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2706 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2707 System.err.println( 2708 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2709 System.err.println(" latency Set to report operation latencies. Default: False"); 2710 System.err.println(" latencyThreshold Set to report number of operations with latency " 2711 + "over lantencyThreshold, unit in millisecond, default 0"); 2712 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2713 + " rows have been treated. Default: 0"); 2714 System.err 2715 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2716 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2717 + "'valueSize'; set on read for stats on size: Default: Not set."); 2718 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2719 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2720 System.err.println(); 2721 System.err.println("Table Creation / Write Tests:"); 2722 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2723 System.err.println( 2724 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2725 + ". In case of randomReads and randomSeekScans this could" 2726 + " be specified along with --size to specify the number of rows to be scanned within" 2727 + " the total range specified by the size."); 2728 System.err.println( 2729 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2730 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2731 + " use size to specify the end range and --rows" 2732 + " specifies the number of rows within that range. " + "Default: 1.0."); 2733 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2734 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2735 System.err.println( 2736 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2737 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2738 + "'valueSize' in zipf form: Default: Not set."); 2739 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2740 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2741 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2742 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2743 System.err.println(" presplit Create presplit table. If a table with same name exists," 2744 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2745 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2746 System.err.println( 2747 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2748 System.err.println(" numoftags Specify the no of tags that would be needed. " 2749 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2750 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2751 System.err.println(" columns Columns to write per row. Default: 1"); 2752 System.err 2753 .println(" families Specify number of column families for the table. Default: 1"); 2754 System.err.println(); 2755 System.err.println("Read Tests:"); 2756 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2757 + " there by not returning any thing back to the client. Helps to check the server side" 2758 + " performance. Uses FilterAllFilter internally. "); 2759 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2760 + "by randomRead. Default: disabled"); 2761 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2762 + "inmemory as far as possible. Not guaranteed that reads are always served " 2763 + "from memory. Default: false"); 2764 System.err 2765 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2766 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2767 System.err 2768 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2769 + "Uses the CompactingMemstore"); 2770 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2771 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2772 System.err.println( 2773 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2774 System.err.println(" caching Scan caching to use. Default: 30"); 2775 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2776 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2777 System.err.println( 2778 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2779 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2780 System.err.println(); 2781 System.err.println(" Note: -D properties will be applied to the conf used. "); 2782 System.err.println(" For example: "); 2783 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2784 System.err.println(" -Dmapreduce.task.timeout=60000"); 2785 System.err.println(); 2786 System.err.println("Command:"); 2787 for (CmdDescriptor command : COMMANDS.values()) { 2788 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2789 } 2790 System.err.println(); 2791 System.err.println("Class:"); 2792 System.err.println("To run any custom implementation of PerformanceEvaluation.Test, " 2793 + "provide the classname of the implementaion class in place of " 2794 + "command name and it will be loaded at runtime from classpath.:"); 2795 System.err.println("Please consider to contribute back " 2796 + "this custom test impl into a builtin PE command for the benefit of the community"); 2797 System.err.println(); 2798 System.err.println("Args:"); 2799 System.err.println(" nclients Integer. Required. Total number of clients " 2800 + "(and HRegionServers) running. 1 <= value <= 500"); 2801 System.err.println("Examples:"); 2802 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2803 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2804 System.err.println(" To run 10 clients doing increments over ten rows:"); 2805 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2806 } 2807 2808 /** 2809 * Parse options passed in via an arguments array. Assumes that array has been split on 2810 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2811 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2812 * arguments. 2813 */ 2814 static TestOptions parseOpts(Queue<String> args) { 2815 TestOptions opts = new TestOptions(); 2816 2817 String cmd = null; 2818 while ((cmd = args.poll()) != null) { 2819 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2820 // place item back onto queue so that caller knows parsing was incomplete 2821 args.add(cmd); 2822 break; 2823 } 2824 2825 final String nmr = "--nomapred"; 2826 if (cmd.startsWith(nmr)) { 2827 opts.nomapred = true; 2828 continue; 2829 } 2830 2831 final String rows = "--rows="; 2832 if (cmd.startsWith(rows)) { 2833 opts.perClientRunRows = Long.parseLong(cmd.substring(rows.length())); 2834 continue; 2835 } 2836 2837 final String cycles = "--cycles="; 2838 if (cmd.startsWith(cycles)) { 2839 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2840 continue; 2841 } 2842 2843 final String sampleRate = "--sampleRate="; 2844 if (cmd.startsWith(sampleRate)) { 2845 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2846 continue; 2847 } 2848 2849 final String table = "--table="; 2850 if (cmd.startsWith(table)) { 2851 opts.tableName = cmd.substring(table.length()); 2852 continue; 2853 } 2854 2855 final String startRow = "--startRow="; 2856 if (cmd.startsWith(startRow)) { 2857 opts.startRow = Long.parseLong(cmd.substring(startRow.length())); 2858 continue; 2859 } 2860 2861 final String compress = "--compress="; 2862 if (cmd.startsWith(compress)) { 2863 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2864 continue; 2865 } 2866 2867 final String encryption = "--encryption="; 2868 if (cmd.startsWith(encryption)) { 2869 opts.encryption = cmd.substring(encryption.length()); 2870 continue; 2871 } 2872 2873 final String traceRate = "--traceRate="; 2874 if (cmd.startsWith(traceRate)) { 2875 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2876 continue; 2877 } 2878 2879 final String blockEncoding = "--blockEncoding="; 2880 if (cmd.startsWith(blockEncoding)) { 2881 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2882 continue; 2883 } 2884 2885 final String flushCommits = "--flushCommits="; 2886 if (cmd.startsWith(flushCommits)) { 2887 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2888 continue; 2889 } 2890 2891 final String writeToWAL = "--writeToWAL="; 2892 if (cmd.startsWith(writeToWAL)) { 2893 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2894 continue; 2895 } 2896 2897 final String presplit = "--presplit="; 2898 if (cmd.startsWith(presplit)) { 2899 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2900 continue; 2901 } 2902 2903 final String inMemory = "--inmemory="; 2904 if (cmd.startsWith(inMemory)) { 2905 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2906 continue; 2907 } 2908 2909 final String autoFlush = "--autoFlush="; 2910 if (cmd.startsWith(autoFlush)) { 2911 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2912 continue; 2913 } 2914 2915 final String onceCon = "--oneCon="; 2916 if (cmd.startsWith(onceCon)) { 2917 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2918 continue; 2919 } 2920 2921 final String connCount = "--connCount="; 2922 if (cmd.startsWith(connCount)) { 2923 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2924 continue; 2925 } 2926 2927 final String latencyThreshold = "--latencyThreshold="; 2928 if (cmd.startsWith(latencyThreshold)) { 2929 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2930 continue; 2931 } 2932 2933 final String latency = "--latency"; 2934 if (cmd.startsWith(latency)) { 2935 opts.reportLatency = true; 2936 continue; 2937 } 2938 2939 final String multiGet = "--multiGet="; 2940 if (cmd.startsWith(multiGet)) { 2941 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2942 continue; 2943 } 2944 2945 final String multiPut = "--multiPut="; 2946 if (cmd.startsWith(multiPut)) { 2947 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2948 continue; 2949 } 2950 2951 final String useTags = "--usetags="; 2952 if (cmd.startsWith(useTags)) { 2953 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2954 continue; 2955 } 2956 2957 final String noOfTags = "--numoftags="; 2958 if (cmd.startsWith(noOfTags)) { 2959 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2960 continue; 2961 } 2962 2963 final String replicas = "--replicas="; 2964 if (cmd.startsWith(replicas)) { 2965 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2966 continue; 2967 } 2968 2969 final String filterOutAll = "--filterAll"; 2970 if (cmd.startsWith(filterOutAll)) { 2971 opts.filterAll = true; 2972 continue; 2973 } 2974 2975 final String size = "--size="; 2976 if (cmd.startsWith(size)) { 2977 opts.size = Float.parseFloat(cmd.substring(size.length())); 2978 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2979 continue; 2980 } 2981 2982 final String splitPolicy = "--splitPolicy="; 2983 if (cmd.startsWith(splitPolicy)) { 2984 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2985 continue; 2986 } 2987 2988 final String randomSleep = "--randomSleep="; 2989 if (cmd.startsWith(randomSleep)) { 2990 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2991 continue; 2992 } 2993 2994 final String measureAfter = "--measureAfter="; 2995 if (cmd.startsWith(measureAfter)) { 2996 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2997 continue; 2998 } 2999 3000 final String bloomFilter = "--bloomFilter="; 3001 if (cmd.startsWith(bloomFilter)) { 3002 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 3003 continue; 3004 } 3005 3006 final String blockSize = "--blockSize="; 3007 if (cmd.startsWith(blockSize)) { 3008 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 3009 continue; 3010 } 3011 3012 final String valueSize = "--valueSize="; 3013 if (cmd.startsWith(valueSize)) { 3014 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 3015 continue; 3016 } 3017 3018 final String valueRandom = "--valueRandom"; 3019 if (cmd.startsWith(valueRandom)) { 3020 opts.valueRandom = true; 3021 continue; 3022 } 3023 3024 final String valueZipf = "--valueZipf"; 3025 if (cmd.startsWith(valueZipf)) { 3026 opts.valueZipf = true; 3027 continue; 3028 } 3029 3030 final String period = "--period="; 3031 if (cmd.startsWith(period)) { 3032 opts.period = Integer.parseInt(cmd.substring(period.length())); 3033 continue; 3034 } 3035 3036 final String addColumns = "--addColumns="; 3037 if (cmd.startsWith(addColumns)) { 3038 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 3039 continue; 3040 } 3041 3042 final String inMemoryCompaction = "--inmemoryCompaction="; 3043 if (cmd.startsWith(inMemoryCompaction)) { 3044 opts.inMemoryCompaction = 3045 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 3046 continue; 3047 } 3048 3049 final String columns = "--columns="; 3050 if (cmd.startsWith(columns)) { 3051 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 3052 continue; 3053 } 3054 3055 final String families = "--families="; 3056 if (cmd.startsWith(families)) { 3057 opts.families = Integer.parseInt(cmd.substring(families.length())); 3058 continue; 3059 } 3060 3061 final String caching = "--caching="; 3062 if (cmd.startsWith(caching)) { 3063 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 3064 continue; 3065 } 3066 3067 final String asyncPrefetch = "--asyncPrefetch"; 3068 if (cmd.startsWith(asyncPrefetch)) { 3069 opts.asyncPrefetch = true; 3070 continue; 3071 } 3072 3073 final String cacheBlocks = "--cacheBlocks="; 3074 if (cmd.startsWith(cacheBlocks)) { 3075 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 3076 continue; 3077 } 3078 3079 final String scanReadType = "--scanReadType="; 3080 if (cmd.startsWith(scanReadType)) { 3081 opts.scanReadType = 3082 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 3083 continue; 3084 } 3085 3086 final String bufferSize = "--bufferSize="; 3087 if (cmd.startsWith(bufferSize)) { 3088 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 3089 continue; 3090 } 3091 3092 final String commandPropertiesFile = "--commandPropertiesFile="; 3093 if (cmd.startsWith(commandPropertiesFile)) { 3094 String fileName = String.valueOf(cmd.substring(commandPropertiesFile.length())); 3095 Properties properties = new Properties(); 3096 try { 3097 properties 3098 .load(PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName)); 3099 opts.commandProperties = properties; 3100 } catch (IOException e) { 3101 LOG.error("Failed to load metricIds from properties file", e); 3102 } 3103 continue; 3104 } 3105 3106 validateParsedOpts(opts); 3107 3108 if (isCommandClass(cmd)) { 3109 opts.cmdName = cmd; 3110 try { 3111 opts.numClientThreads = Integer.parseInt(args.remove()); 3112 } catch (NoSuchElementException | NumberFormatException e) { 3113 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 3114 } 3115 opts = calculateRowsAndSize(opts); 3116 break; 3117 } else { 3118 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 3119 } 3120 3121 // Not matching any option or command. 3122 System.err.println("Error: Wrong option or command: " + cmd); 3123 args.add(cmd); 3124 break; 3125 } 3126 return opts; 3127 } 3128 3129 /** 3130 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3131 */ 3132 private static void validateParsedOpts(TestOptions opts) { 3133 3134 if (!opts.autoFlush && opts.multiPut > 0) { 3135 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3136 } 3137 3138 if (opts.oneCon && opts.connCount > 1) { 3139 throw new IllegalArgumentException( 3140 "oneCon is set to true, " + "connCount should not bigger than 1"); 3141 } 3142 3143 if (opts.valueZipf && opts.valueRandom) { 3144 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3145 } 3146 } 3147 3148 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3149 int rowsPerGB = getRowsPerGB(opts); 3150 if ( 3151 (opts.getCmdName() != null 3152 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3153 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3154 ) { 3155 opts.totalRows = (long) (opts.size * rowsPerGB); 3156 } else if (opts.size != DEFAULT_OPTS.size) { 3157 // total size in GB specified 3158 opts.totalRows = (long) (opts.size * rowsPerGB); 3159 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3160 } else { 3161 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3162 // Cast to float to ensure floating-point division 3163 opts.size = (float) opts.totalRows / rowsPerGB; 3164 } 3165 return opts; 3166 } 3167 3168 static int getRowsPerGB(final TestOptions opts) { 3169 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3170 * opts.getColumns()); 3171 } 3172 3173 @Override 3174 public int run(String[] args) throws Exception { 3175 // Process command-line args. TODO: Better cmd-line processing 3176 // (but hopefully something not as painful as cli options). 3177 int errCode = -1; 3178 if (args.length < 1) { 3179 printUsage(); 3180 return errCode; 3181 } 3182 3183 try { 3184 LinkedList<String> argv = new LinkedList<>(); 3185 argv.addAll(Arrays.asList(args)); 3186 TestOptions opts = parseOpts(argv); 3187 3188 // args remaining, print help and exit 3189 if (!argv.isEmpty()) { 3190 errCode = 0; 3191 printUsage(); 3192 return errCode; 3193 } 3194 3195 // must run at least 1 client 3196 if (opts.numClientThreads <= 0) { 3197 throw new IllegalArgumentException("Number of clients must be > 0"); 3198 } 3199 3200 // cmdName should not be null, print help and exit 3201 if (opts.cmdName == null) { 3202 printUsage(); 3203 return errCode; 3204 } 3205 3206 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3207 if (cmdClass != null) { 3208 runTest(cmdClass, opts); 3209 errCode = 0; 3210 } 3211 3212 } catch (Exception e) { 3213 e.printStackTrace(); 3214 } 3215 3216 return errCode; 3217 } 3218 3219 private static boolean isCommandClass(String cmd) { 3220 return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd); 3221 } 3222 3223 private static boolean isCustomTestClass(String cmd) { 3224 Class<? extends Test> cmdClass; 3225 try { 3226 cmdClass = 3227 (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd); 3228 addCommandDescriptor(cmdClass, cmd, "custom command"); 3229 return true; 3230 } catch (Throwable th) { 3231 LOG.info("No class found for command: " + cmd, th); 3232 return false; 3233 } 3234 } 3235 3236 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3237 CmdDescriptor descriptor = COMMANDS.get(cmd); 3238 return descriptor != null ? descriptor.getCmdClass() : null; 3239 } 3240 3241 public static void main(final String[] args) throws Exception { 3242 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3243 System.exit(res); 3244 } 3245}