001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.codahale.metrics.Histogram; 021import com.codahale.metrics.UniformReservoir; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.io.PrintStream; 026import java.lang.reflect.Constructor; 027import java.math.BigDecimal; 028import java.math.MathContext; 029import java.text.DecimalFormat; 030import java.text.SimpleDateFormat; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Date; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Locale; 037import java.util.Map; 038import java.util.NoSuchElementException; 039import java.util.Queue; 040import java.util.Random; 041import java.util.TreeMap; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.ThreadLocalRandom; 048import org.apache.commons.lang3.StringUtils; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.conf.Configured; 051import org.apache.hadoop.fs.FileSystem; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.Append; 055import org.apache.hadoop.hbase.client.AsyncConnection; 056import org.apache.hadoop.hbase.client.AsyncTable; 057import org.apache.hadoop.hbase.client.BufferedMutator; 058import org.apache.hadoop.hbase.client.BufferedMutatorParams; 059import org.apache.hadoop.hbase.client.Connection; 060import org.apache.hadoop.hbase.client.ConnectionFactory; 061import org.apache.hadoop.hbase.client.Consistency; 062import org.apache.hadoop.hbase.client.Delete; 063import org.apache.hadoop.hbase.client.Durability; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Increment; 066import org.apache.hadoop.hbase.client.Put; 067import org.apache.hadoop.hbase.client.RegionInfo; 068import org.apache.hadoop.hbase.client.RegionInfoBuilder; 069import org.apache.hadoop.hbase.client.RegionLocator; 070import org.apache.hadoop.hbase.client.Result; 071import org.apache.hadoop.hbase.client.ResultScanner; 072import org.apache.hadoop.hbase.client.RowMutations; 073import org.apache.hadoop.hbase.client.Scan; 074import org.apache.hadoop.hbase.client.Table; 075import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 076import org.apache.hadoop.hbase.filter.BinaryComparator; 077import org.apache.hadoop.hbase.filter.Filter; 078import org.apache.hadoop.hbase.filter.FilterAllFilter; 079import org.apache.hadoop.hbase.filter.FilterList; 080import org.apache.hadoop.hbase.filter.PageFilter; 081import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 082import org.apache.hadoop.hbase.filter.WhileMatchFilter; 083import org.apache.hadoop.hbase.io.compress.Compression; 084import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 085import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 086import org.apache.hadoop.hbase.regionserver.BloomType; 087import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 088import org.apache.hadoop.hbase.trace.TraceUtil; 089import org.apache.hadoop.hbase.util.ByteArrayHashKey; 090import org.apache.hadoop.hbase.util.Bytes; 091import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 092import org.apache.hadoop.hbase.util.GsonUtil; 093import org.apache.hadoop.hbase.util.Hash; 094import org.apache.hadoop.hbase.util.MurmurHash; 095import org.apache.hadoop.hbase.util.Pair; 096import org.apache.hadoop.hbase.util.RandomDistribution; 097import org.apache.hadoop.hbase.util.YammerHistogramUtils; 098import org.apache.hadoop.io.LongWritable; 099import org.apache.hadoop.io.Text; 100import org.apache.hadoop.mapreduce.Job; 101import org.apache.hadoop.mapreduce.Mapper; 102import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 103import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 104import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 105import org.apache.hadoop.util.Tool; 106import org.apache.hadoop.util.ToolRunner; 107import org.apache.yetus.audience.InterfaceAudience; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110 111import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 112import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 113import org.apache.hbase.thirdparty.com.google.gson.Gson; 114 115/** 116 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through 117 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 118 * etc.). Pass on the command-line which test to run and how many clients are participating in this 119 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage. 120 * <p> 121 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 122 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 123 * pages 8-10. 124 * <p> 125 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as 126 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does 127 * about 1GB of data, unless specified otherwise. 128 */ 129@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 130public class PerformanceEvaluation extends Configured implements Tool { 131 static final String RANDOM_SEEK_SCAN = "randomSeekScan"; 132 static final String RANDOM_READ = "randomRead"; 133 static final String PE_COMMAND_SHORTNAME = "pe"; 134 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName()); 135 private static final Gson GSON = GsonUtil.createGson().create(); 136 137 public static final String TABLE_NAME = "TestTable"; 138 public static final String FAMILY_NAME_BASE = "info"; 139 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0"); 140 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0); 141 public static final int DEFAULT_VALUE_LENGTH = 1000; 142 public static final int ROW_LENGTH = 26; 143 144 private static final int ONE_GB = 1024 * 1024 * 1000; 145 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH; 146 // TODO : should we make this configurable 147 private static final int TAG_LENGTH = 256; 148 private static final DecimalFormat FMT = new DecimalFormat("0.##"); 149 private static final MathContext CXT = MathContext.DECIMAL64; 150 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); 151 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); 152 private static final TestOptions DEFAULT_OPTS = new TestOptions(); 153 154 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>(); 155 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 156 157 static { 158 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", 159 "Run async random read test"); 160 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", 161 "Run async random write test"); 162 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", 163 "Run async sequential read test"); 164 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", 165 "Run async sequential write test"); 166 addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)"); 167 addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test"); 168 addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test"); 169 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN, 170 "Run random seek and scan 100 test"); 171 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 172 "Run random seek scan with both start and stop row (max 10 rows)"); 173 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 174 "Run random seek scan with both start and stop row (max 100 rows)"); 175 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 176 "Run random seek scan with both start and stop row (max 1000 rows)"); 177 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 178 "Run random seek scan with both start and stop row (max 10000 rows)"); 179 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 180 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 181 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 182 addCommandDescriptor(MetaWriteTest.class, "metaWrite", 183 "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); 184 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 185 addCommandDescriptor(ReverseScanTest.class, "reverseScan", 186 "Run reverse scan test (read every row)"); 187 addCommandDescriptor(FilteredScanTest.class, "filterScan", 188 "Run scan test using a filter to find a specific row based on it's value " 189 + "(make sure to use --rows=20)"); 190 addCommandDescriptor(IncrementTest.class, "increment", 191 "Increment on each row; clients overlap on keyspace so some concurrent operations"); 192 addCommandDescriptor(AppendTest.class, "append", 193 "Append on each row; clients overlap on keyspace so some concurrent operations"); 194 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate", 195 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations"); 196 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut", 197 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations"); 198 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete", 199 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations"); 200 addCommandDescriptor(CleanMetaTest.class, "cleanMeta", 201 "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread"); 202 } 203 204 /** 205 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 206 * associated properties. 207 */ 208 protected static enum Counter { 209 /** elapsed time */ 210 ELAPSED_TIME, 211 /** number of rows */ 212 ROWS 213 } 214 215 protected static class RunResult implements Comparable<RunResult> { 216 public RunResult(long duration, Histogram hist) { 217 this.duration = duration; 218 this.hist = hist; 219 numbOfReplyOverThreshold = 0; 220 numOfReplyFromReplica = 0; 221 } 222 223 public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica, 224 Histogram hist) { 225 this.duration = duration; 226 this.hist = hist; 227 this.numbOfReplyOverThreshold = numbOfReplyOverThreshold; 228 this.numOfReplyFromReplica = numOfReplyFromReplica; 229 } 230 231 public final long duration; 232 public final Histogram hist; 233 public final long numbOfReplyOverThreshold; 234 public final long numOfReplyFromReplica; 235 236 @Override 237 public String toString() { 238 return Long.toString(duration); 239 } 240 241 @Override 242 public int compareTo(RunResult o) { 243 return Long.compare(this.duration, o.duration); 244 } 245 } 246 247 /** 248 * Constructor 249 * @param conf Configuration object 250 */ 251 public PerformanceEvaluation(final Configuration conf) { 252 super(conf); 253 } 254 255 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name, 256 String description) { 257 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 258 COMMANDS.put(name, cmdDescriptor); 259 } 260 261 /** 262 * Implementations can have their status set. 263 */ 264 interface Status { 265 /** 266 * Sets status 267 * @param msg status message 268 */ 269 void setStatus(final String msg) throws IOException; 270 } 271 272 /** 273 * MapReduce job that runs a performance evaluation client in each map task. 274 */ 275 public static class EvaluationMapTask 276 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 277 278 /** configuration parameter name that contains the command */ 279 public final static String CMD_KEY = "EvaluationMapTask.command"; 280 /** configuration parameter name that contains the PE impl */ 281 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 282 283 private Class<? extends Test> cmd; 284 285 @Override 286 protected void setup(Context context) throws IOException, InterruptedException { 287 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 288 289 // this is required so that extensions of PE are instantiated within the 290 // map reduce task... 291 Class<? extends PerformanceEvaluation> peClass = 292 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 293 try { 294 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 295 } catch (Exception e) { 296 throw new IllegalStateException("Could not instantiate PE instance", e); 297 } 298 } 299 300 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 301 try { 302 return Class.forName(className).asSubclass(type); 303 } catch (ClassNotFoundException e) { 304 throw new IllegalStateException("Could not find class for name: " + className, e); 305 } 306 } 307 308 @Override 309 protected void map(LongWritable key, Text value, final Context context) 310 throws IOException, InterruptedException { 311 312 Status status = new Status() { 313 @Override 314 public void setStatus(String msg) { 315 context.setStatus(msg); 316 } 317 }; 318 319 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class); 320 Configuration conf = HBaseConfiguration.create(context.getConfiguration()); 321 final Connection con = ConnectionFactory.createConnection(conf); 322 AsyncConnection asyncCon = null; 323 try { 324 asyncCon = ConnectionFactory.createAsyncConnection(conf).get(); 325 } catch (ExecutionException e) { 326 throw new IOException(e); 327 } 328 329 // Evaluation task 330 RunResult result = 331 PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status); 332 // Collect how much time the thing took. Report as map output and 333 // to the ELAPSED_TIME counter. 334 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration); 335 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); 336 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration)); 337 context.progress(); 338 } 339 } 340 341 /* 342 * If table does not already exist, create. Also create a table when {@code opts.presplitRegions} 343 * is specified or when the existing table's region replica count doesn't match {@code 344 * opts.replicas}. 345 */ 346 static boolean checkTable(Admin admin, TestOptions opts) throws IOException { 347 TableName tableName = TableName.valueOf(opts.tableName); 348 boolean needsDelete = false, exists = admin.tableExists(tableName); 349 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") 350 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); 351 if (!exists && isReadCmd) { 352 throw new IllegalStateException( 353 "Must specify an existing table for read commands. Run a write command first."); 354 } 355 HTableDescriptor desc = 356 exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; 357 byte[][] splits = getSplits(opts); 358 359 // recreate the table when user has requested presplit or when existing 360 // {RegionSplitPolicy,replica count} does not match requested, or when the 361 // number of column families does not match requested. 362 if ( 363 (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) 364 || (!isReadCmd && desc != null 365 && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) 366 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) 367 || (desc != null && desc.getColumnFamilyCount() != opts.families) 368 ) { 369 needsDelete = true; 370 // wait, why did it delete my table?!? 371 LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete) 372 .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc) 373 .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy) 374 .add("replicas", opts.replicas).add("families", opts.families).toString()); 375 } 376 377 // remove an existing table 378 if (needsDelete) { 379 if (admin.isTableEnabled(tableName)) { 380 admin.disableTable(tableName); 381 } 382 admin.deleteTable(tableName); 383 } 384 385 // table creation is necessary 386 if (!exists || needsDelete) { 387 desc = getTableDescriptor(opts); 388 if (splits != null) { 389 if (LOG.isDebugEnabled()) { 390 for (int i = 0; i < splits.length; i++) { 391 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 392 } 393 } 394 } 395 admin.createTable(desc, splits); 396 LOG.info("Table " + desc + " created"); 397 } 398 return admin.tableExists(tableName); 399 } 400 401 /** 402 * Create an HTableDescriptor from provided TestOptions. 403 */ 404 protected static HTableDescriptor getTableDescriptor(TestOptions opts) { 405 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName)); 406 for (int family = 0; family < opts.families; family++) { 407 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 408 HColumnDescriptor familyDesc = new HColumnDescriptor(familyName); 409 familyDesc.setDataBlockEncoding(opts.blockEncoding); 410 familyDesc.setCompressionType(opts.compression); 411 familyDesc.setEncryptionType(opts.encryption); 412 familyDesc.setBloomFilterType(opts.bloomType); 413 familyDesc.setBlocksize(opts.blockSize); 414 if (opts.inMemoryCF) { 415 familyDesc.setInMemory(true); 416 } 417 familyDesc.setInMemoryCompaction(opts.inMemoryCompaction); 418 tableDesc.addFamily(familyDesc); 419 } 420 if (opts.replicas != DEFAULT_OPTS.replicas) { 421 tableDesc.setRegionReplication(opts.replicas); 422 } 423 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) { 424 tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy); 425 } 426 return tableDesc; 427 } 428 429 /** 430 * generates splits based on total number of rows and specified split regions 431 */ 432 protected static byte[][] getSplits(TestOptions opts) { 433 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null; 434 435 int numSplitPoints = opts.presplitRegions - 1; 436 byte[][] splits = new byte[numSplitPoints][]; 437 int jump = opts.totalRows / opts.presplitRegions; 438 for (int i = 0; i < numSplitPoints; i++) { 439 int rowkey = jump * (1 + i); 440 splits[i] = format(rowkey); 441 } 442 return splits; 443 } 444 445 static void setupConnectionCount(final TestOptions opts) { 446 if (opts.oneCon) { 447 opts.connCount = 1; 448 } else { 449 if (opts.connCount == -1) { 450 // set to thread number if connCount is not set 451 opts.connCount = opts.numClientThreads; 452 } 453 } 454 } 455 456 /* 457 * Run all clients in this vm each to its own thread. 458 */ 459 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf) 460 throws IOException, InterruptedException, ExecutionException { 461 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 462 assert cmd != null; 463 @SuppressWarnings("unchecked") 464 Future<RunResult>[] threads = new Future[opts.numClientThreads]; 465 RunResult[] results = new RunResult[opts.numClientThreads]; 466 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, 467 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); 468 setupConnectionCount(opts); 469 final Connection[] cons = new Connection[opts.connCount]; 470 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount]; 471 for (int i = 0; i < opts.connCount; i++) { 472 cons[i] = ConnectionFactory.createConnection(conf); 473 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get(); 474 } 475 LOG 476 .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads"); 477 for (int i = 0; i < threads.length; i++) { 478 final int index = i; 479 threads[i] = pool.submit(new Callable<RunResult>() { 480 @Override 481 public RunResult call() throws Exception { 482 TestOptions threadOpts = new TestOptions(opts); 483 final Connection con = cons[index % cons.length]; 484 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length]; 485 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; 486 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() { 487 @Override 488 public void setStatus(final String msg) throws IOException { 489 LOG.info(msg); 490 } 491 }); 492 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration 493 + "ms over " + threadOpts.perClientRunRows + " rows"); 494 if (opts.latencyThreshold > 0) { 495 LOG.info("Number of replies over latency threshold " + opts.latencyThreshold 496 + "(ms) is " + run.numbOfReplyOverThreshold); 497 } 498 return run; 499 } 500 }); 501 } 502 pool.shutdown(); 503 504 for (int i = 0; i < threads.length; i++) { 505 try { 506 results[i] = threads[i].get(); 507 } catch (ExecutionException e) { 508 throw new IOException(e.getCause()); 509 } 510 } 511 final String test = cmd.getSimpleName(); 512 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results)); 513 Arrays.sort(results); 514 long total = 0; 515 float avgLatency = 0; 516 float avgTPS = 0; 517 long replicaWins = 0; 518 for (RunResult result : results) { 519 total += result.duration; 520 avgLatency += result.hist.getSnapshot().getMean(); 521 avgTPS += opts.perClientRunRows * 1.0f / result.duration; 522 replicaWins += result.numOfReplyFromReplica; 523 } 524 avgTPS *= 1000; // ms to second 525 avgLatency = avgLatency / results.length; 526 LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " 527 + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); 528 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); 529 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); 530 if (opts.replicas > 1) { 531 LOG.info("[results from replica regions] " + replicaWins); 532 } 533 534 for (int i = 0; i < opts.connCount; i++) { 535 cons[i].close(); 536 asyncCons[i].close(); 537 } 538 539 return results; 540 } 541 542 /* 543 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 544 * out an input file with instruction per client regards which row they are to start on. 545 * @param cmd Command to run. 546 */ 547 static Job doMapReduce(TestOptions opts, final Configuration conf) 548 throws IOException, InterruptedException, ClassNotFoundException { 549 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName); 550 assert cmd != null; 551 Path inputDir = writeInputFile(conf, opts); 552 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 553 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); 554 Job job = Job.getInstance(conf); 555 job.setJarByClass(PerformanceEvaluation.class); 556 job.setJobName("HBase Performance Evaluation - " + opts.cmdName); 557 558 job.setInputFormatClass(NLineInputFormat.class); 559 NLineInputFormat.setInputPaths(job, inputDir); 560 // this is default, but be explicit about it just in case. 561 NLineInputFormat.setNumLinesPerSplit(job, 1); 562 563 job.setOutputKeyClass(LongWritable.class); 564 job.setOutputValueClass(LongWritable.class); 565 566 job.setMapperClass(EvaluationMapTask.class); 567 job.setReducerClass(LongSumReducer.class); 568 569 job.setNumReduceTasks(1); 570 571 job.setOutputFormatClass(TextOutputFormat.class); 572 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 573 574 TableMapReduceUtil.addDependencyJars(job); 575 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer 576 // metrics 577 Gson.class, // gson 578 FilterAllFilter.class // hbase-server tests jar 579 ); 580 581 TableMapReduceUtil.initCredentials(job); 582 583 job.waitForCompletion(true); 584 return job; 585 } 586 587 /** 588 * Each client has one mapper to do the work, and client do the resulting count in a map task. 589 */ 590 591 static String JOB_INPUT_FILENAME = "input.txt"; 592 593 /* 594 * Write input file of offsets-per-client for the mapreduce job. 595 * @param c Configuration 596 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME 597 */ 598 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { 599 return writeInputFile(c, opts, new Path(".")); 600 } 601 602 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) 603 throws IOException { 604 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 605 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); 606 Path inputDir = new Path(jobdir, "inputs"); 607 608 FileSystem fs = FileSystem.get(c); 609 fs.mkdirs(inputDir); 610 611 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); 612 PrintStream out = new PrintStream(fs.create(inputFile)); 613 // Make input random. 614 Map<Integer, String> m = new TreeMap<>(); 615 Hash h = MurmurHash.getInstance(); 616 int perClientRows = (opts.totalRows / opts.numClientThreads); 617 try { 618 for (int j = 0; j < opts.numClientThreads; j++) { 619 TestOptions next = new TestOptions(opts); 620 next.startRow = j * perClientRows; 621 next.perClientRunRows = perClientRows; 622 String s = GSON.toJson(next); 623 LOG.info("Client=" + j + ", input=" + s); 624 byte[] b = Bytes.toBytes(s); 625 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 626 m.put(hash, s); 627 } 628 for (Map.Entry<Integer, String> e : m.entrySet()) { 629 out.println(e.getValue()); 630 } 631 } finally { 632 out.close(); 633 } 634 return inputDir; 635 } 636 637 /** 638 * Describes a command. 639 */ 640 static class CmdDescriptor { 641 private Class<? extends TestBase> cmdClass; 642 private String name; 643 private String description; 644 645 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) { 646 this.cmdClass = cmdClass; 647 this.name = name; 648 this.description = description; 649 } 650 651 public Class<? extends TestBase> getCmdClass() { 652 return cmdClass; 653 } 654 655 public String getName() { 656 return name; 657 } 658 659 public String getDescription() { 660 return description; 661 } 662 } 663 664 /** 665 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes 666 * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data 667 * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you 668 * need to add to the clone constructor below copying your new option from the 'that' to the 669 * 'this'. Look for 'clone' below. 670 */ 671 static class TestOptions { 672 String cmdName = null; 673 boolean nomapred = false; 674 boolean filterAll = false; 675 int startRow = 0; 676 float size = 1.0f; 677 int perClientRunRows = DEFAULT_ROWS_PER_GB; 678 int numClientThreads = 1; 679 int totalRows = DEFAULT_ROWS_PER_GB; 680 int measureAfter = 0; 681 float sampleRate = 1.0f; 682 /** 683 * @deprecated Useless after switching to OpenTelemetry 684 */ 685 @Deprecated 686 double traceRate = 0.0; 687 String tableName = TABLE_NAME; 688 boolean flushCommits = true; 689 boolean writeToWAL = true; 690 boolean autoFlush = false; 691 boolean oneCon = false; 692 int connCount = -1; // wil decide the actual num later 693 boolean useTags = false; 694 int noOfTags = 1; 695 boolean reportLatency = false; 696 int multiGet = 0; 697 int multiPut = 0; 698 int randomSleep = 0; 699 boolean inMemoryCF = false; 700 int presplitRegions = 0; 701 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; 702 String splitPolicy = null; 703 Compression.Algorithm compression = Compression.Algorithm.NONE; 704 String encryption = null; 705 BloomType bloomType = BloomType.ROW; 706 int blockSize = HConstants.DEFAULT_BLOCKSIZE; 707 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 708 boolean valueRandom = false; 709 boolean valueZipf = false; 710 int valueSize = DEFAULT_VALUE_LENGTH; 711 int period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10; 712 int cycles = 1; 713 int columns = 1; 714 int families = 1; 715 int caching = 30; 716 int latencyThreshold = 0; // in millsecond 717 boolean addColumns = true; 718 MemoryCompactionPolicy inMemoryCompaction = 719 MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT); 720 boolean asyncPrefetch = false; 721 boolean cacheBlocks = true; 722 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT; 723 long bufferSize = 2l * 1024l * 1024l; 724 725 public TestOptions() { 726 } 727 728 /** 729 * Clone constructor. 730 * @param that Object to copy from. 731 */ 732 public TestOptions(TestOptions that) { 733 this.cmdName = that.cmdName; 734 this.cycles = that.cycles; 735 this.nomapred = that.nomapred; 736 this.startRow = that.startRow; 737 this.size = that.size; 738 this.perClientRunRows = that.perClientRunRows; 739 this.numClientThreads = that.numClientThreads; 740 this.totalRows = that.totalRows; 741 this.sampleRate = that.sampleRate; 742 this.traceRate = that.traceRate; 743 this.tableName = that.tableName; 744 this.flushCommits = that.flushCommits; 745 this.writeToWAL = that.writeToWAL; 746 this.autoFlush = that.autoFlush; 747 this.oneCon = that.oneCon; 748 this.connCount = that.connCount; 749 this.useTags = that.useTags; 750 this.noOfTags = that.noOfTags; 751 this.reportLatency = that.reportLatency; 752 this.latencyThreshold = that.latencyThreshold; 753 this.multiGet = that.multiGet; 754 this.multiPut = that.multiPut; 755 this.inMemoryCF = that.inMemoryCF; 756 this.presplitRegions = that.presplitRegions; 757 this.replicas = that.replicas; 758 this.splitPolicy = that.splitPolicy; 759 this.compression = that.compression; 760 this.encryption = that.encryption; 761 this.blockEncoding = that.blockEncoding; 762 this.filterAll = that.filterAll; 763 this.bloomType = that.bloomType; 764 this.blockSize = that.blockSize; 765 this.valueRandom = that.valueRandom; 766 this.valueZipf = that.valueZipf; 767 this.valueSize = that.valueSize; 768 this.period = that.period; 769 this.randomSleep = that.randomSleep; 770 this.measureAfter = that.measureAfter; 771 this.addColumns = that.addColumns; 772 this.columns = that.columns; 773 this.families = that.families; 774 this.caching = that.caching; 775 this.inMemoryCompaction = that.inMemoryCompaction; 776 this.asyncPrefetch = that.asyncPrefetch; 777 this.cacheBlocks = that.cacheBlocks; 778 this.scanReadType = that.scanReadType; 779 this.bufferSize = that.bufferSize; 780 } 781 782 public int getCaching() { 783 return this.caching; 784 } 785 786 public void setCaching(final int caching) { 787 this.caching = caching; 788 } 789 790 public int getColumns() { 791 return this.columns; 792 } 793 794 public void setColumns(final int columns) { 795 this.columns = columns; 796 } 797 798 public int getFamilies() { 799 return this.families; 800 } 801 802 public void setFamilies(final int families) { 803 this.families = families; 804 } 805 806 public int getCycles() { 807 return this.cycles; 808 } 809 810 public void setCycles(final int cycles) { 811 this.cycles = cycles; 812 } 813 814 public boolean isValueZipf() { 815 return valueZipf; 816 } 817 818 public void setValueZipf(boolean valueZipf) { 819 this.valueZipf = valueZipf; 820 } 821 822 public String getCmdName() { 823 return cmdName; 824 } 825 826 public void setCmdName(String cmdName) { 827 this.cmdName = cmdName; 828 } 829 830 public int getRandomSleep() { 831 return randomSleep; 832 } 833 834 public void setRandomSleep(int randomSleep) { 835 this.randomSleep = randomSleep; 836 } 837 838 public int getReplicas() { 839 return replicas; 840 } 841 842 public void setReplicas(int replicas) { 843 this.replicas = replicas; 844 } 845 846 public String getSplitPolicy() { 847 return splitPolicy; 848 } 849 850 public void setSplitPolicy(String splitPolicy) { 851 this.splitPolicy = splitPolicy; 852 } 853 854 public void setNomapred(boolean nomapred) { 855 this.nomapred = nomapred; 856 } 857 858 public void setFilterAll(boolean filterAll) { 859 this.filterAll = filterAll; 860 } 861 862 public void setStartRow(int startRow) { 863 this.startRow = startRow; 864 } 865 866 public void setSize(float size) { 867 this.size = size; 868 } 869 870 public void setPerClientRunRows(int perClientRunRows) { 871 this.perClientRunRows = perClientRunRows; 872 } 873 874 public void setNumClientThreads(int numClientThreads) { 875 this.numClientThreads = numClientThreads; 876 } 877 878 public void setTotalRows(int totalRows) { 879 this.totalRows = totalRows; 880 } 881 882 public void setSampleRate(float sampleRate) { 883 this.sampleRate = sampleRate; 884 } 885 886 public void setTraceRate(double traceRate) { 887 this.traceRate = traceRate; 888 } 889 890 public void setTableName(String tableName) { 891 this.tableName = tableName; 892 } 893 894 public void setFlushCommits(boolean flushCommits) { 895 this.flushCommits = flushCommits; 896 } 897 898 public void setWriteToWAL(boolean writeToWAL) { 899 this.writeToWAL = writeToWAL; 900 } 901 902 public void setAutoFlush(boolean autoFlush) { 903 this.autoFlush = autoFlush; 904 } 905 906 public void setOneCon(boolean oneCon) { 907 this.oneCon = oneCon; 908 } 909 910 public int getConnCount() { 911 return connCount; 912 } 913 914 public void setConnCount(int connCount) { 915 this.connCount = connCount; 916 } 917 918 public void setUseTags(boolean useTags) { 919 this.useTags = useTags; 920 } 921 922 public void setNoOfTags(int noOfTags) { 923 this.noOfTags = noOfTags; 924 } 925 926 public void setReportLatency(boolean reportLatency) { 927 this.reportLatency = reportLatency; 928 } 929 930 public void setMultiGet(int multiGet) { 931 this.multiGet = multiGet; 932 } 933 934 public void setMultiPut(int multiPut) { 935 this.multiPut = multiPut; 936 } 937 938 public void setInMemoryCF(boolean inMemoryCF) { 939 this.inMemoryCF = inMemoryCF; 940 } 941 942 public void setPresplitRegions(int presplitRegions) { 943 this.presplitRegions = presplitRegions; 944 } 945 946 public void setCompression(Compression.Algorithm compression) { 947 this.compression = compression; 948 } 949 950 public void setEncryption(String encryption) { 951 this.encryption = encryption; 952 } 953 954 public void setBloomType(BloomType bloomType) { 955 this.bloomType = bloomType; 956 } 957 958 public void setBlockSize(int blockSize) { 959 this.blockSize = blockSize; 960 } 961 962 public void setBlockEncoding(DataBlockEncoding blockEncoding) { 963 this.blockEncoding = blockEncoding; 964 } 965 966 public void setValueRandom(boolean valueRandom) { 967 this.valueRandom = valueRandom; 968 } 969 970 public void setValueSize(int valueSize) { 971 this.valueSize = valueSize; 972 } 973 974 public void setBufferSize(long bufferSize) { 975 this.bufferSize = bufferSize; 976 } 977 978 public void setPeriod(int period) { 979 this.period = period; 980 } 981 982 public boolean isNomapred() { 983 return nomapred; 984 } 985 986 public boolean isFilterAll() { 987 return filterAll; 988 } 989 990 public int getStartRow() { 991 return startRow; 992 } 993 994 public float getSize() { 995 return size; 996 } 997 998 public int getPerClientRunRows() { 999 return perClientRunRows; 1000 } 1001 1002 public int getNumClientThreads() { 1003 return numClientThreads; 1004 } 1005 1006 public int getTotalRows() { 1007 return totalRows; 1008 } 1009 1010 public float getSampleRate() { 1011 return sampleRate; 1012 } 1013 1014 public double getTraceRate() { 1015 return traceRate; 1016 } 1017 1018 public String getTableName() { 1019 return tableName; 1020 } 1021 1022 public boolean isFlushCommits() { 1023 return flushCommits; 1024 } 1025 1026 public boolean isWriteToWAL() { 1027 return writeToWAL; 1028 } 1029 1030 public boolean isAutoFlush() { 1031 return autoFlush; 1032 } 1033 1034 public boolean isUseTags() { 1035 return useTags; 1036 } 1037 1038 public int getNoOfTags() { 1039 return noOfTags; 1040 } 1041 1042 public boolean isReportLatency() { 1043 return reportLatency; 1044 } 1045 1046 public int getMultiGet() { 1047 return multiGet; 1048 } 1049 1050 public int getMultiPut() { 1051 return multiPut; 1052 } 1053 1054 public boolean isInMemoryCF() { 1055 return inMemoryCF; 1056 } 1057 1058 public int getPresplitRegions() { 1059 return presplitRegions; 1060 } 1061 1062 public Compression.Algorithm getCompression() { 1063 return compression; 1064 } 1065 1066 public String getEncryption() { 1067 return encryption; 1068 } 1069 1070 public DataBlockEncoding getBlockEncoding() { 1071 return blockEncoding; 1072 } 1073 1074 public boolean isValueRandom() { 1075 return valueRandom; 1076 } 1077 1078 public int getValueSize() { 1079 return valueSize; 1080 } 1081 1082 public int getPeriod() { 1083 return period; 1084 } 1085 1086 public BloomType getBloomType() { 1087 return bloomType; 1088 } 1089 1090 public int getBlockSize() { 1091 return blockSize; 1092 } 1093 1094 public boolean isOneCon() { 1095 return oneCon; 1096 } 1097 1098 public int getMeasureAfter() { 1099 return measureAfter; 1100 } 1101 1102 public void setMeasureAfter(int measureAfter) { 1103 this.measureAfter = measureAfter; 1104 } 1105 1106 public boolean getAddColumns() { 1107 return addColumns; 1108 } 1109 1110 public void setAddColumns(boolean addColumns) { 1111 this.addColumns = addColumns; 1112 } 1113 1114 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) { 1115 this.inMemoryCompaction = inMemoryCompaction; 1116 } 1117 1118 public MemoryCompactionPolicy getInMemoryCompaction() { 1119 return this.inMemoryCompaction; 1120 } 1121 1122 public long getBufferSize() { 1123 return this.bufferSize; 1124 } 1125 } 1126 1127 /* 1128 * A test. Subclass to particularize what happens per row. 1129 */ 1130 static abstract class TestBase { 1131 // Below is make it so when Tests are all running in the one 1132 // jvm, that they each have a differently seeded Random. 1133 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 1134 1135 private static long nextRandomSeed() { 1136 return randomSeed.nextLong(); 1137 } 1138 1139 private final int everyN; 1140 1141 protected final Random rand = new Random(nextRandomSeed()); 1142 protected final Configuration conf; 1143 protected final TestOptions opts; 1144 1145 private final Status status; 1146 1147 private String testName; 1148 private Histogram latencyHistogram; 1149 private Histogram replicaLatencyHistogram; 1150 private Histogram valueSizeHistogram; 1151 private Histogram rpcCallsHistogram; 1152 private Histogram remoteRpcCallsHistogram; 1153 private Histogram millisBetweenNextHistogram; 1154 private Histogram regionsScannedHistogram; 1155 private Histogram bytesInResultsHistogram; 1156 private Histogram bytesInRemoteResultsHistogram; 1157 private RandomDistribution.Zipf zipf; 1158 private long numOfReplyOverLatencyThreshold = 0; 1159 private long numOfReplyFromReplica = 0; 1160 1161 /** 1162 * Note that all subclasses of this class must provide a public constructor that has the exact 1163 * same list of arguments. 1164 */ 1165 TestBase(final Configuration conf, final TestOptions options, final Status status) { 1166 this.conf = conf; 1167 this.opts = options; 1168 this.status = status; 1169 this.testName = this.getClass().getSimpleName(); 1170 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); 1171 if (options.isValueZipf()) { 1172 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); 1173 } 1174 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); 1175 } 1176 1177 int getValueLength(final Random r) { 1178 if (this.opts.isValueRandom()) { 1179 return r.nextInt(opts.valueSize); 1180 } else if (this.opts.isValueZipf()) { 1181 return Math.abs(this.zipf.nextInt()); 1182 } else { 1183 return opts.valueSize; 1184 } 1185 } 1186 1187 void updateValueSize(final Result[] rs) throws IOException { 1188 updateValueSize(rs, 0); 1189 } 1190 1191 void updateValueSize(final Result[] rs, final long latency) throws IOException { 1192 if (rs == null || (latency == 0)) return; 1193 for (Result r : rs) 1194 updateValueSize(r, latency); 1195 } 1196 1197 void updateValueSize(final Result r) throws IOException { 1198 updateValueSize(r, 0); 1199 } 1200 1201 void updateValueSize(final Result r, final long latency) throws IOException { 1202 if (r == null || (latency == 0)) return; 1203 int size = 0; 1204 // update replicaHistogram 1205 if (r.isStale()) { 1206 replicaLatencyHistogram.update(latency / 1000); 1207 numOfReplyFromReplica++; 1208 } 1209 if (!isRandomValueSize()) return; 1210 1211 for (CellScanner scanner = r.cellScanner(); scanner.advance();) { 1212 size += scanner.current().getValueLength(); 1213 } 1214 updateValueSize(size); 1215 } 1216 1217 void updateValueSize(final int valueSize) { 1218 if (!isRandomValueSize()) return; 1219 this.valueSizeHistogram.update(valueSize); 1220 } 1221 1222 void updateScanMetrics(final ScanMetrics metrics) { 1223 if (metrics == null) return; 1224 Map<String, Long> metricsMap = metrics.getMetricsMap(); 1225 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); 1226 if (rpcCalls != null) { 1227 this.rpcCallsHistogram.update(rpcCalls.longValue()); 1228 } 1229 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME); 1230 if (remoteRpcCalls != null) { 1231 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue()); 1232 } 1233 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME); 1234 if (millisBetweenNext != null) { 1235 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue()); 1236 } 1237 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); 1238 if (regionsScanned != null) { 1239 this.regionsScannedHistogram.update(regionsScanned.longValue()); 1240 } 1241 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME); 1242 if (bytesInResults != null && bytesInResults.longValue() > 0) { 1243 this.bytesInResultsHistogram.update(bytesInResults.longValue()); 1244 } 1245 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME); 1246 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) { 1247 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue()); 1248 } 1249 } 1250 1251 String generateStatus(final int sr, final int i, final int lr) { 1252 return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency [" 1253 + getShortLatencyReport() + "]" 1254 + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]"); 1255 } 1256 1257 boolean isRandomValueSize() { 1258 return opts.valueRandom; 1259 } 1260 1261 protected int getReportingPeriod() { 1262 return opts.period; 1263 } 1264 1265 /** 1266 * Populated by testTakedown. Only implemented by RandomReadTest at the moment. 1267 */ 1268 public Histogram getLatencyHistogram() { 1269 return latencyHistogram; 1270 } 1271 1272 void testSetup() throws IOException { 1273 // test metrics 1274 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1275 // If it is a replica test, set up histogram for replica. 1276 if (opts.replicas > 1) { 1277 replicaLatencyHistogram = 1278 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1279 } 1280 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1281 // scan metrics 1282 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1283 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1284 millisBetweenNextHistogram = 1285 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1286 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1287 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1288 bytesInRemoteResultsHistogram = 1289 YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500)); 1290 1291 onStartup(); 1292 } 1293 1294 abstract void onStartup() throws IOException; 1295 1296 void testTakedown() throws IOException { 1297 onTakedown(); 1298 // Print all stats for this thread continuously. 1299 // Synchronize on Test.class so different threads don't intermingle the 1300 // output. We can't use 'this' here because each thread has its own instance of Test class. 1301 synchronized (Test.class) { 1302 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName()); 1303 status 1304 .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram)); 1305 if (opts.replicas > 1) { 1306 status.setStatus("Latency (us) from Replica Regions: " 1307 + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram)); 1308 } 1309 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); 1310 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); 1311 if (valueSizeHistogram.getCount() > 0) { 1312 status.setStatus( 1313 "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); 1314 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); 1315 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); 1316 } else { 1317 status.setStatus("No valueSize statistics available"); 1318 } 1319 if (rpcCallsHistogram.getCount() > 0) { 1320 status.setStatus( 1321 "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram)); 1322 } 1323 if (remoteRpcCallsHistogram.getCount() > 0) { 1324 status.setStatus("remoteRpcCalls (count): " 1325 + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram)); 1326 } 1327 if (millisBetweenNextHistogram.getCount() > 0) { 1328 status.setStatus("millisBetweenNext (latency): " 1329 + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram)); 1330 } 1331 if (regionsScannedHistogram.getCount() > 0) { 1332 status.setStatus("regionsScanned (count): " 1333 + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram)); 1334 } 1335 if (bytesInResultsHistogram.getCount() > 0) { 1336 status.setStatus("bytesInResults (size): " 1337 + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram)); 1338 } 1339 if (bytesInRemoteResultsHistogram.getCount() > 0) { 1340 status.setStatus("bytesInRemoteResults (size): " 1341 + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram)); 1342 } 1343 } 1344 } 1345 1346 abstract void onTakedown() throws IOException; 1347 1348 /* 1349 * Run test 1350 * @return Elapsed time. 1351 */ 1352 long test() throws IOException, InterruptedException { 1353 testSetup(); 1354 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 1355 final long startTime = System.nanoTime(); 1356 try { 1357 testTimed(); 1358 } finally { 1359 testTakedown(); 1360 } 1361 return (System.nanoTime() - startTime) / 1000000; 1362 } 1363 1364 int getStartRow() { 1365 return opts.startRow; 1366 } 1367 1368 int getLastRow() { 1369 return getStartRow() + opts.perClientRunRows; 1370 } 1371 1372 /** 1373 * Provides an extension point for tests that don't want a per row invocation. 1374 */ 1375 void testTimed() throws IOException, InterruptedException { 1376 int startRow = getStartRow(); 1377 int lastRow = getLastRow(); 1378 // Report on completion of 1/10th of total. 1379 for (int ii = 0; ii < opts.cycles; ii++) { 1380 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); 1381 for (int i = startRow; i < lastRow; i++) { 1382 if (i % everyN != 0) continue; 1383 long startTime = System.nanoTime(); 1384 boolean requestSent = false; 1385 Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan(); 1386 try (Scope scope = span.makeCurrent()) { 1387 requestSent = testRow(i, startTime); 1388 } finally { 1389 span.end(); 1390 } 1391 if ((i - startRow) > opts.measureAfter) { 1392 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately 1393 // first 9 times and sends the actual get request in the 10th iteration. 1394 // We should only set latency when actual request is sent because otherwise 1395 // it turns out to be 0. 1396 if (requestSent) { 1397 long latency = (System.nanoTime() - startTime) / 1000; 1398 latencyHistogram.update(latency); 1399 if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) { 1400 numOfReplyOverLatencyThreshold++; 1401 } 1402 } 1403 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 1404 status.setStatus(generateStatus(startRow, i, lastRow)); 1405 } 1406 } 1407 } 1408 } 1409 } 1410 1411 /** Returns Subset of the histograms' calculation. */ 1412 public String getShortLatencyReport() { 1413 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram); 1414 } 1415 1416 /** Returns Subset of the histograms' calculation. */ 1417 public String getShortValueSizeReport() { 1418 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); 1419 } 1420 1421 /** 1422 * Test for individual row. 1423 * @param i Row index. 1424 * @return true if the row was sent to server and need to record metrics. False if not, multiGet 1425 * and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered. 1426 */ 1427 abstract boolean testRow(final int i, final long startTime) 1428 throws IOException, InterruptedException; 1429 } 1430 1431 static abstract class Test extends TestBase { 1432 protected Connection connection; 1433 1434 Test(final Connection con, final TestOptions options, final Status status) { 1435 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1436 this.connection = con; 1437 } 1438 } 1439 1440 static abstract class AsyncTest extends TestBase { 1441 protected AsyncConnection connection; 1442 1443 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) { 1444 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status); 1445 this.connection = con; 1446 } 1447 } 1448 1449 static abstract class TableTest extends Test { 1450 protected Table table; 1451 1452 TableTest(Connection con, TestOptions options, Status status) { 1453 super(con, options, status); 1454 } 1455 1456 @Override 1457 void onStartup() throws IOException { 1458 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1459 } 1460 1461 @Override 1462 void onTakedown() throws IOException { 1463 table.close(); 1464 } 1465 } 1466 1467 /* 1468 * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest 1469 */ 1470 static abstract class MetaTest extends TableTest { 1471 protected int keyLength; 1472 1473 MetaTest(Connection con, TestOptions options, Status status) { 1474 super(con, options, status); 1475 keyLength = Integer.toString(opts.perClientRunRows).length(); 1476 } 1477 1478 @Override 1479 void onTakedown() throws IOException { 1480 // No clean up 1481 } 1482 1483 /* 1484 * Generates Lexicographically ascending strings 1485 */ 1486 protected byte[] getSplitKey(final int i) { 1487 return Bytes.toBytes(String.format("%0" + keyLength + "d", i)); 1488 } 1489 1490 } 1491 1492 static abstract class AsyncTableTest extends AsyncTest { 1493 protected AsyncTable<?> table; 1494 1495 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) { 1496 super(con, options, status); 1497 } 1498 1499 @Override 1500 void onStartup() throws IOException { 1501 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1502 } 1503 1504 @Override 1505 void onTakedown() throws IOException { 1506 } 1507 } 1508 1509 static class AsyncRandomReadTest extends AsyncTableTest { 1510 private final Consistency consistency; 1511 private ArrayList<Get> gets; 1512 1513 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) { 1514 super(con, options, status); 1515 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1516 if (opts.multiGet > 0) { 1517 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1518 this.gets = new ArrayList<>(opts.multiGet); 1519 } 1520 } 1521 1522 @Override 1523 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1524 if (opts.randomSleep > 0) { 1525 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1526 } 1527 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1528 for (int family = 0; family < opts.families; family++) { 1529 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1530 if (opts.addColumns) { 1531 for (int column = 0; column < opts.columns; column++) { 1532 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1533 get.addColumn(familyName, qualifier); 1534 } 1535 } else { 1536 get.addFamily(familyName); 1537 } 1538 } 1539 if (opts.filterAll) { 1540 get.setFilter(new FilterAllFilter()); 1541 } 1542 get.setConsistency(consistency); 1543 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1544 try { 1545 if (opts.multiGet > 0) { 1546 this.gets.add(get); 1547 if (this.gets.size() == opts.multiGet) { 1548 Result[] rs = 1549 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new); 1550 updateValueSize(rs); 1551 this.gets.clear(); 1552 } else { 1553 return false; 1554 } 1555 } else { 1556 updateValueSize(this.table.get(get).get()); 1557 } 1558 } catch (ExecutionException e) { 1559 throw new IOException(e); 1560 } 1561 return true; 1562 } 1563 1564 public static RuntimeException runtime(Throwable e) { 1565 if (e instanceof RuntimeException) { 1566 return (RuntimeException) e; 1567 } 1568 return new RuntimeException(e); 1569 } 1570 1571 public static <V> V propagate(Callable<V> callable) { 1572 try { 1573 return callable.call(); 1574 } catch (Exception e) { 1575 throw runtime(e); 1576 } 1577 } 1578 1579 @Override 1580 protected int getReportingPeriod() { 1581 int period = opts.perClientRunRows / 10; 1582 return period == 0 ? opts.perClientRunRows : period; 1583 } 1584 1585 @Override 1586 protected void testTakedown() throws IOException { 1587 if (this.gets != null && this.gets.size() > 0) { 1588 this.table.get(gets); 1589 this.gets.clear(); 1590 } 1591 super.testTakedown(); 1592 } 1593 } 1594 1595 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest { 1596 1597 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) { 1598 super(con, options, status); 1599 } 1600 1601 @Override 1602 protected byte[] generateRow(final int i) { 1603 return getRandomRow(this.rand, opts.totalRows); 1604 } 1605 } 1606 1607 static class AsyncScanTest extends AsyncTableTest { 1608 private ResultScanner testScanner; 1609 private AsyncTable<?> asyncTable; 1610 1611 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) { 1612 super(con, options, status); 1613 } 1614 1615 @Override 1616 void onStartup() throws IOException { 1617 this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName), 1618 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 1619 } 1620 1621 @Override 1622 void testTakedown() throws IOException { 1623 if (this.testScanner != null) { 1624 updateScanMetrics(this.testScanner.getScanMetrics()); 1625 this.testScanner.close(); 1626 } 1627 super.testTakedown(); 1628 } 1629 1630 @Override 1631 boolean testRow(final int i, final long startTime) throws IOException { 1632 if (this.testScanner == null) { 1633 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 1634 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1635 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1636 for (int family = 0; family < opts.families; family++) { 1637 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1638 if (opts.addColumns) { 1639 for (int column = 0; column < opts.columns; column++) { 1640 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1641 scan.addColumn(familyName, qualifier); 1642 } 1643 } else { 1644 scan.addFamily(familyName); 1645 } 1646 } 1647 if (opts.filterAll) { 1648 scan.setFilter(new FilterAllFilter()); 1649 } 1650 this.testScanner = asyncTable.getScanner(scan); 1651 } 1652 Result r = testScanner.next(); 1653 updateValueSize(r); 1654 return true; 1655 } 1656 } 1657 1658 static class AsyncSequentialReadTest extends AsyncTableTest { 1659 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) { 1660 super(con, options, status); 1661 } 1662 1663 @Override 1664 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1665 Get get = new Get(format(i)); 1666 for (int family = 0; family < opts.families; family++) { 1667 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1668 if (opts.addColumns) { 1669 for (int column = 0; column < opts.columns; column++) { 1670 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1671 get.addColumn(familyName, qualifier); 1672 } 1673 } else { 1674 get.addFamily(familyName); 1675 } 1676 } 1677 if (opts.filterAll) { 1678 get.setFilter(new FilterAllFilter()); 1679 } 1680 try { 1681 updateValueSize(table.get(get).get()); 1682 } catch (ExecutionException e) { 1683 throw new IOException(e); 1684 } 1685 return true; 1686 } 1687 } 1688 1689 static class AsyncSequentialWriteTest extends AsyncTableTest { 1690 private ArrayList<Put> puts; 1691 1692 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) { 1693 super(con, options, status); 1694 if (opts.multiPut > 0) { 1695 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 1696 this.puts = new ArrayList<>(opts.multiPut); 1697 } 1698 } 1699 1700 protected byte[] generateRow(final int i) { 1701 return format(i); 1702 } 1703 1704 @Override 1705 @SuppressWarnings("ReturnValueIgnored") 1706 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1707 byte[] row = generateRow(i); 1708 Put put = new Put(row); 1709 for (int family = 0; family < opts.families; family++) { 1710 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1711 for (int column = 0; column < opts.columns; column++) { 1712 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1713 byte[] value = generateData(this.rand, getValueLength(this.rand)); 1714 if (opts.useTags) { 1715 byte[] tag = generateData(this.rand, TAG_LENGTH); 1716 Tag[] tags = new Tag[opts.noOfTags]; 1717 for (int n = 0; n < opts.noOfTags; n++) { 1718 Tag t = new ArrayBackedTag((byte) n, tag); 1719 tags[n] = t; 1720 } 1721 KeyValue kv = 1722 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 1723 put.add(kv); 1724 updateValueSize(kv.getValueLength()); 1725 } else { 1726 put.addColumn(familyName, qualifier, value); 1727 updateValueSize(value.length); 1728 } 1729 } 1730 } 1731 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1732 try { 1733 table.put(put).get(); 1734 if (opts.multiPut > 0) { 1735 this.puts.add(put); 1736 if (this.puts.size() == opts.multiPut) { 1737 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get)); 1738 this.puts.clear(); 1739 } else { 1740 return false; 1741 } 1742 } else { 1743 table.put(put).get(); 1744 } 1745 } catch (ExecutionException e) { 1746 throw new IOException(e); 1747 } 1748 return true; 1749 } 1750 } 1751 1752 static abstract class BufferedMutatorTest extends Test { 1753 protected BufferedMutator mutator; 1754 protected Table table; 1755 1756 BufferedMutatorTest(Connection con, TestOptions options, Status status) { 1757 super(con, options, status); 1758 } 1759 1760 @Override 1761 void onStartup() throws IOException { 1762 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName)); 1763 p.writeBufferSize(opts.bufferSize); 1764 this.mutator = connection.getBufferedMutator(p); 1765 this.table = connection.getTable(TableName.valueOf(opts.tableName)); 1766 } 1767 1768 @Override 1769 void onTakedown() throws IOException { 1770 mutator.close(); 1771 table.close(); 1772 } 1773 } 1774 1775 static class RandomSeekScanTest extends TableTest { 1776 RandomSeekScanTest(Connection con, TestOptions options, Status status) { 1777 super(con, options, status); 1778 } 1779 1780 @Override 1781 boolean testRow(final int i, final long startTime) throws IOException { 1782 Scan scan = 1783 new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)).setCaching(opts.caching) 1784 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1785 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1786 FilterList list = new FilterList(); 1787 for (int family = 0; family < opts.families; family++) { 1788 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1789 if (opts.addColumns) { 1790 for (int column = 0; column < opts.columns; column++) { 1791 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1792 scan.addColumn(familyName, qualifier); 1793 } 1794 } else { 1795 scan.addFamily(familyName); 1796 } 1797 } 1798 if (opts.filterAll) { 1799 list.addFilter(new FilterAllFilter()); 1800 } 1801 list.addFilter(new WhileMatchFilter(new PageFilter(120))); 1802 scan.setFilter(list); 1803 ResultScanner s = this.table.getScanner(scan); 1804 try { 1805 for (Result rr; (rr = s.next()) != null;) { 1806 updateValueSize(rr); 1807 } 1808 } finally { 1809 updateScanMetrics(s.getScanMetrics()); 1810 s.close(); 1811 } 1812 return true; 1813 } 1814 1815 @Override 1816 protected int getReportingPeriod() { 1817 int period = opts.perClientRunRows / 100; 1818 return period == 0 ? opts.perClientRunRows : period; 1819 } 1820 1821 } 1822 1823 static abstract class RandomScanWithRangeTest extends TableTest { 1824 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { 1825 super(con, options, status); 1826 } 1827 1828 @Override 1829 boolean testRow(final int i, final long startTime) throws IOException { 1830 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 1831 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 1832 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) 1833 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 1834 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 1835 for (int family = 0; family < opts.families; family++) { 1836 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1837 if (opts.addColumns) { 1838 for (int column = 0; column < opts.columns; column++) { 1839 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1840 scan.addColumn(familyName, qualifier); 1841 } 1842 } else { 1843 scan.addFamily(familyName); 1844 } 1845 } 1846 if (opts.filterAll) { 1847 scan.setFilter(new FilterAllFilter()); 1848 } 1849 Result r = null; 1850 int count = 0; 1851 ResultScanner s = this.table.getScanner(scan); 1852 try { 1853 for (; (r = s.next()) != null;) { 1854 updateValueSize(r); 1855 count++; 1856 } 1857 if (i % 100 == 0) { 1858 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 1859 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 1860 count)); 1861 } 1862 } finally { 1863 updateScanMetrics(s.getScanMetrics()); 1864 s.close(); 1865 } 1866 return true; 1867 } 1868 1869 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 1870 1871 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 1872 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; 1873 int stop = start + maxRange; 1874 return new Pair<>(format(start), format(stop)); 1875 } 1876 1877 @Override 1878 protected int getReportingPeriod() { 1879 int period = opts.perClientRunRows / 100; 1880 return period == 0 ? opts.perClientRunRows : period; 1881 } 1882 } 1883 1884 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1885 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) { 1886 super(con, options, status); 1887 } 1888 1889 @Override 1890 protected Pair<byte[], byte[]> getStartAndStopRow() { 1891 return generateStartAndStopRows(10); 1892 } 1893 } 1894 1895 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1896 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) { 1897 super(con, options, status); 1898 } 1899 1900 @Override 1901 protected Pair<byte[], byte[]> getStartAndStopRow() { 1902 return generateStartAndStopRows(100); 1903 } 1904 } 1905 1906 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1907 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) { 1908 super(con, options, status); 1909 } 1910 1911 @Override 1912 protected Pair<byte[], byte[]> getStartAndStopRow() { 1913 return generateStartAndStopRows(1000); 1914 } 1915 } 1916 1917 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1918 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) { 1919 super(con, options, status); 1920 } 1921 1922 @Override 1923 protected Pair<byte[], byte[]> getStartAndStopRow() { 1924 return generateStartAndStopRows(10000); 1925 } 1926 } 1927 1928 static class RandomReadTest extends TableTest { 1929 private final Consistency consistency; 1930 private ArrayList<Get> gets; 1931 1932 RandomReadTest(Connection con, TestOptions options, Status status) { 1933 super(con, options, status); 1934 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; 1935 if (opts.multiGet > 0) { 1936 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); 1937 this.gets = new ArrayList<>(opts.multiGet); 1938 } 1939 } 1940 1941 @Override 1942 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 1943 if (opts.randomSleep > 0) { 1944 Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep)); 1945 } 1946 Get get = new Get(getRandomRow(this.rand, opts.totalRows)); 1947 for (int family = 0; family < opts.families; family++) { 1948 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 1949 if (opts.addColumns) { 1950 for (int column = 0; column < opts.columns; column++) { 1951 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 1952 get.addColumn(familyName, qualifier); 1953 } 1954 } else { 1955 get.addFamily(familyName); 1956 } 1957 } 1958 if (opts.filterAll) { 1959 get.setFilter(new FilterAllFilter()); 1960 } 1961 get.setConsistency(consistency); 1962 if (LOG.isTraceEnabled()) LOG.trace(get.toString()); 1963 if (opts.multiGet > 0) { 1964 this.gets.add(get); 1965 if (this.gets.size() == opts.multiGet) { 1966 Result[] rs = this.table.get(this.gets); 1967 if (opts.replicas > 1) { 1968 long latency = System.nanoTime() - startTime; 1969 updateValueSize(rs, latency); 1970 } else { 1971 updateValueSize(rs); 1972 } 1973 this.gets.clear(); 1974 } else { 1975 return false; 1976 } 1977 } else { 1978 if (opts.replicas > 1) { 1979 Result r = this.table.get(get); 1980 long latency = System.nanoTime() - startTime; 1981 updateValueSize(r, latency); 1982 } else { 1983 updateValueSize(this.table.get(get)); 1984 } 1985 } 1986 return true; 1987 } 1988 1989 @Override 1990 protected int getReportingPeriod() { 1991 int period = opts.perClientRunRows / 10; 1992 return period == 0 ? opts.perClientRunRows : period; 1993 } 1994 1995 @Override 1996 protected void testTakedown() throws IOException { 1997 if (this.gets != null && this.gets.size() > 0) { 1998 this.table.get(gets); 1999 this.gets.clear(); 2000 } 2001 super.testTakedown(); 2002 } 2003 } 2004 2005 /* 2006 * Send random reads against fake regions inserted by MetaWriteTest 2007 */ 2008 static class MetaRandomReadTest extends MetaTest { 2009 private Random rd = new Random(); 2010 private RegionLocator regionLocator; 2011 2012 MetaRandomReadTest(Connection con, TestOptions options, Status status) { 2013 super(con, options, status); 2014 LOG.info("call getRegionLocation"); 2015 } 2016 2017 @Override 2018 void onStartup() throws IOException { 2019 super.onStartup(); 2020 this.regionLocator = connection.getRegionLocator(table.getName()); 2021 } 2022 2023 @Override 2024 boolean testRow(final int i, final long startTime) throws IOException, InterruptedException { 2025 if (opts.randomSleep > 0) { 2026 Thread.sleep(rd.nextInt(opts.randomSleep)); 2027 } 2028 HRegionLocation hRegionLocation = 2029 regionLocator.getRegionLocation(getSplitKey(rd.nextInt(opts.perClientRunRows)), true); 2030 LOG.debug("get location for region: " + hRegionLocation); 2031 return true; 2032 } 2033 2034 @Override 2035 protected int getReportingPeriod() { 2036 int period = opts.perClientRunRows / 10; 2037 return period == 0 ? opts.perClientRunRows : period; 2038 } 2039 2040 @Override 2041 protected void testTakedown() throws IOException { 2042 super.testTakedown(); 2043 } 2044 } 2045 2046 static class RandomWriteTest extends SequentialWriteTest { 2047 RandomWriteTest(Connection con, TestOptions options, Status status) { 2048 super(con, options, status); 2049 } 2050 2051 @Override 2052 protected byte[] generateRow(final int i) { 2053 return getRandomRow(this.rand, opts.totalRows); 2054 } 2055 2056 } 2057 2058 static class ScanTest extends TableTest { 2059 private ResultScanner testScanner; 2060 2061 ScanTest(Connection con, TestOptions options, Status status) { 2062 super(con, options, status); 2063 } 2064 2065 @Override 2066 void testTakedown() throws IOException { 2067 if (this.testScanner != null) { 2068 this.testScanner.close(); 2069 } 2070 super.testTakedown(); 2071 } 2072 2073 @Override 2074 boolean testRow(final int i, final long startTime) throws IOException { 2075 if (this.testScanner == null) { 2076 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) 2077 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) 2078 .setReadType(opts.scanReadType).setScanMetricsEnabled(true); 2079 for (int family = 0; family < opts.families; family++) { 2080 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2081 if (opts.addColumns) { 2082 for (int column = 0; column < opts.columns; column++) { 2083 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2084 scan.addColumn(familyName, qualifier); 2085 } 2086 } else { 2087 scan.addFamily(familyName); 2088 } 2089 } 2090 if (opts.filterAll) { 2091 scan.setFilter(new FilterAllFilter()); 2092 } 2093 this.testScanner = table.getScanner(scan); 2094 } 2095 Result r = testScanner.next(); 2096 updateValueSize(r); 2097 return true; 2098 } 2099 } 2100 2101 static class ReverseScanTest extends TableTest { 2102 private ResultScanner testScanner; 2103 2104 ReverseScanTest(Connection con, TestOptions options, Status status) { 2105 super(con, options, status); 2106 } 2107 2108 @Override 2109 void testTakedown() throws IOException { 2110 if (this.testScanner != null) { 2111 this.testScanner.close(); 2112 } 2113 super.testTakedown(); 2114 } 2115 2116 @Override 2117 boolean testRow(final int i, final long startTime) throws IOException { 2118 if (this.testScanner == null) { 2119 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2120 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2121 .setScanMetricsEnabled(true).setReversed(true); 2122 for (int family = 0; family < opts.families; family++) { 2123 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2124 if (opts.addColumns) { 2125 for (int column = 0; column < opts.columns; column++) { 2126 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2127 scan.addColumn(familyName, qualifier); 2128 } 2129 } else { 2130 scan.addFamily(familyName); 2131 } 2132 } 2133 if (opts.filterAll) { 2134 scan.setFilter(new FilterAllFilter()); 2135 } 2136 this.testScanner = table.getScanner(scan); 2137 } 2138 Result r = testScanner.next(); 2139 updateValueSize(r); 2140 return true; 2141 } 2142 } 2143 2144 /** 2145 * Base class for operations that are CAS-like; that read a value and then set it based off what 2146 * they read. In this category is increment, append, checkAndPut, etc. 2147 * <p> 2148 * These operations also want some concurrency going on. Usually when these tests run, they 2149 * operate in their own part of the key range. In CASTest, we will have them all overlap on the 2150 * same key space. We do this with our getStartRow and getLastRow overrides. 2151 */ 2152 static abstract class CASTableTest extends TableTest { 2153 private final byte[] qualifier; 2154 2155 CASTableTest(Connection con, TestOptions options, Status status) { 2156 super(con, options, status); 2157 qualifier = Bytes.toBytes(this.getClass().getSimpleName()); 2158 } 2159 2160 byte[] getQualifier() { 2161 return this.qualifier; 2162 } 2163 2164 @Override 2165 int getStartRow() { 2166 return 0; 2167 } 2168 2169 @Override 2170 int getLastRow() { 2171 return opts.perClientRunRows; 2172 } 2173 } 2174 2175 static class IncrementTest extends CASTableTest { 2176 IncrementTest(Connection con, TestOptions options, Status status) { 2177 super(con, options, status); 2178 } 2179 2180 @Override 2181 boolean testRow(final int i, final long startTime) throws IOException { 2182 Increment increment = new Increment(format(i)); 2183 // unlike checkAndXXX tests, which make most sense to do on a single value, 2184 // if multiple families are specified for an increment test we assume it is 2185 // meant to raise the work factor 2186 for (int family = 0; family < opts.families; family++) { 2187 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2188 increment.addColumn(familyName, getQualifier(), 1l); 2189 } 2190 updateValueSize(this.table.increment(increment)); 2191 return true; 2192 } 2193 } 2194 2195 static class AppendTest extends CASTableTest { 2196 AppendTest(Connection con, TestOptions options, Status status) { 2197 super(con, options, status); 2198 } 2199 2200 @Override 2201 boolean testRow(final int i, final long startTime) throws IOException { 2202 byte[] bytes = format(i); 2203 Append append = new Append(bytes); 2204 // unlike checkAndXXX tests, which make most sense to do on a single value, 2205 // if multiple families are specified for an append test we assume it is 2206 // meant to raise the work factor 2207 for (int family = 0; family < opts.families; family++) { 2208 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2209 append.addColumn(familyName, getQualifier(), bytes); 2210 } 2211 updateValueSize(this.table.append(append)); 2212 return true; 2213 } 2214 } 2215 2216 static class CheckAndMutateTest extends CASTableTest { 2217 CheckAndMutateTest(Connection con, TestOptions options, Status status) { 2218 super(con, options, status); 2219 } 2220 2221 @Override 2222 boolean testRow(final int i, final long startTime) throws IOException { 2223 final byte[] bytes = format(i); 2224 // checkAndXXX tests operate on only a single value 2225 // Put a known value so when we go to check it, it is there. 2226 Put put = new Put(bytes); 2227 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2228 this.table.put(put); 2229 RowMutations mutations = new RowMutations(bytes); 2230 mutations.add(put); 2231 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2232 .thenMutate(mutations); 2233 return true; 2234 } 2235 } 2236 2237 static class CheckAndPutTest extends CASTableTest { 2238 CheckAndPutTest(Connection con, TestOptions options, Status status) { 2239 super(con, options, status); 2240 } 2241 2242 @Override 2243 boolean testRow(final int i, final long startTime) throws IOException { 2244 final byte[] bytes = format(i); 2245 // checkAndXXX tests operate on only a single value 2246 // Put a known value so when we go to check it, it is there. 2247 Put put = new Put(bytes); 2248 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2249 this.table.put(put); 2250 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2251 .thenPut(put); 2252 return true; 2253 } 2254 } 2255 2256 static class CheckAndDeleteTest extends CASTableTest { 2257 CheckAndDeleteTest(Connection con, TestOptions options, Status status) { 2258 super(con, options, status); 2259 } 2260 2261 @Override 2262 boolean testRow(final int i, final long startTime) throws IOException { 2263 final byte[] bytes = format(i); 2264 // checkAndXXX tests operate on only a single value 2265 // Put a known value so when we go to check it, it is there. 2266 Put put = new Put(bytes); 2267 put.addColumn(FAMILY_ZERO, getQualifier(), bytes); 2268 this.table.put(put); 2269 Delete delete = new Delete(put.getRow()); 2270 delete.addColumn(FAMILY_ZERO, getQualifier()); 2271 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes) 2272 .thenDelete(delete); 2273 return true; 2274 } 2275 } 2276 2277 /* 2278 * Delete all fake regions inserted to meta table by MetaWriteTest. 2279 */ 2280 static class CleanMetaTest extends MetaTest { 2281 CleanMetaTest(Connection con, TestOptions options, Status status) { 2282 super(con, options, status); 2283 } 2284 2285 @Override 2286 boolean testRow(final int i, final long startTime) throws IOException { 2287 try { 2288 RegionInfo regionInfo = connection.getRegionLocator(table.getName()) 2289 .getRegionLocation(getSplitKey(i), false).getRegion(); 2290 LOG.debug("deleting region from meta: " + regionInfo); 2291 2292 Delete delete = 2293 MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP); 2294 try (Table t = MetaTableAccessor.getMetaHTable(connection)) { 2295 t.delete(delete); 2296 } 2297 } catch (IOException ie) { 2298 // Log and continue 2299 LOG.error("cannot find region with start key: " + i); 2300 } 2301 return true; 2302 } 2303 } 2304 2305 static class SequentialReadTest extends TableTest { 2306 SequentialReadTest(Connection con, TestOptions options, Status status) { 2307 super(con, options, status); 2308 } 2309 2310 @Override 2311 boolean testRow(final int i, final long startTime) throws IOException { 2312 Get get = new Get(format(i)); 2313 for (int family = 0; family < opts.families; family++) { 2314 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); 2315 if (opts.addColumns) { 2316 for (int column = 0; column < opts.columns; column++) { 2317 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2318 get.addColumn(familyName, qualifier); 2319 } 2320 } else { 2321 get.addFamily(familyName); 2322 } 2323 } 2324 if (opts.filterAll) { 2325 get.setFilter(new FilterAllFilter()); 2326 } 2327 updateValueSize(table.get(get)); 2328 return true; 2329 } 2330 } 2331 2332 static class SequentialWriteTest extends BufferedMutatorTest { 2333 private ArrayList<Put> puts; 2334 2335 SequentialWriteTest(Connection con, TestOptions options, Status status) { 2336 super(con, options, status); 2337 if (opts.multiPut > 0) { 2338 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); 2339 this.puts = new ArrayList<>(opts.multiPut); 2340 } 2341 } 2342 2343 protected byte[] generateRow(final int i) { 2344 return format(i); 2345 } 2346 2347 @Override 2348 boolean testRow(final int i, final long startTime) throws IOException { 2349 byte[] row = generateRow(i); 2350 Put put = new Put(row); 2351 for (int family = 0; family < opts.families; family++) { 2352 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family); 2353 for (int column = 0; column < opts.columns; column++) { 2354 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2355 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2356 if (opts.useTags) { 2357 byte[] tag = generateData(this.rand, TAG_LENGTH); 2358 Tag[] tags = new Tag[opts.noOfTags]; 2359 for (int n = 0; n < opts.noOfTags; n++) { 2360 Tag t = new ArrayBackedTag((byte) n, tag); 2361 tags[n] = t; 2362 } 2363 KeyValue kv = 2364 new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags); 2365 put.add(kv); 2366 updateValueSize(kv.getValueLength()); 2367 } else { 2368 put.addColumn(familyName, qualifier, value); 2369 updateValueSize(value.length); 2370 } 2371 } 2372 } 2373 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 2374 if (opts.autoFlush) { 2375 if (opts.multiPut > 0) { 2376 this.puts.add(put); 2377 if (this.puts.size() == opts.multiPut) { 2378 table.put(this.puts); 2379 this.puts.clear(); 2380 } else { 2381 return false; 2382 } 2383 } else { 2384 table.put(put); 2385 } 2386 } else { 2387 mutator.mutate(put); 2388 } 2389 return true; 2390 } 2391 } 2392 2393 /* 2394 * Insert fake regions into meta table with contiguous split keys. 2395 */ 2396 static class MetaWriteTest extends MetaTest { 2397 2398 MetaWriteTest(Connection con, TestOptions options, Status status) { 2399 super(con, options, status); 2400 } 2401 2402 @Override 2403 boolean testRow(final int i, final long startTime) throws IOException { 2404 List<RegionInfo> regionInfos = new ArrayList<RegionInfo>(); 2405 RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME)) 2406 .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build()); 2407 regionInfos.add(regionInfo); 2408 MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1); 2409 2410 // write the serverName columns 2411 MetaTableAccessor.updateRegionLocation(connection, regionInfo, 2412 ServerName.valueOf("localhost", 60010, rand.nextLong()), i, 2413 EnvironmentEdgeManager.currentTime()); 2414 return true; 2415 } 2416 } 2417 2418 static class FilteredScanTest extends TableTest { 2419 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 2420 2421 FilteredScanTest(Connection con, TestOptions options, Status status) { 2422 super(con, options, status); 2423 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) { 2424 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB 2425 + ". This could take a very long time."); 2426 } 2427 } 2428 2429 @Override 2430 boolean testRow(int i, final long startTime) throws IOException { 2431 byte[] value = generateData(this.rand, getValueLength(this.rand)); 2432 Scan scan = constructScan(value); 2433 ResultScanner scanner = null; 2434 try { 2435 scanner = this.table.getScanner(scan); 2436 for (Result r = null; (r = scanner.next()) != null;) { 2437 updateValueSize(r); 2438 } 2439 } finally { 2440 if (scanner != null) { 2441 updateScanMetrics(scanner.getScanMetrics()); 2442 scanner.close(); 2443 } 2444 } 2445 return true; 2446 } 2447 2448 protected Scan constructScan(byte[] valuePrefix) throws IOException { 2449 FilterList list = new FilterList(); 2450 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL, 2451 new BinaryComparator(valuePrefix)); 2452 list.addFilter(filter); 2453 if (opts.filterAll) { 2454 list.addFilter(new FilterAllFilter()); 2455 } 2456 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) 2457 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType) 2458 .setScanMetricsEnabled(true); 2459 if (opts.addColumns) { 2460 for (int column = 0; column < opts.columns; column++) { 2461 byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column); 2462 scan.addColumn(FAMILY_ZERO, qualifier); 2463 } 2464 } else { 2465 scan.addFamily(FAMILY_ZERO); 2466 } 2467 scan.setFilter(list); 2468 return scan; 2469 } 2470 } 2471 2472 /** 2473 * Compute a throughput rate in MB/s. 2474 * @param rows Number of records consumed. 2475 * @param timeMs Time taken in milliseconds. 2476 * @return String value with label, ie '123.76 MB/s' 2477 */ 2478 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, 2479 int columns) { 2480 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH 2481 + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families); 2482 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) 2483 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT); 2484 return FMT.format(mbps) + " MB/s"; 2485 } 2486 2487 /* 2488 * Format passed integer. 2489 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does 2490 * absolute in case number is negative). 2491 */ 2492 public static byte[] format(final int number) { 2493 byte[] b = new byte[ROW_LENGTH]; 2494 int d = Math.abs(number); 2495 for (int i = b.length - 1; i >= 0; i--) { 2496 b[i] = (byte) ((d % 10) + '0'); 2497 d /= 10; 2498 } 2499 return b; 2500 } 2501 2502 /* 2503 * This method takes some time and is done inline uploading data. For example, doing the mapfile 2504 * test, generation of the key and value consumes about 30% of CPU time. 2505 * @return Generated random value to insert into a table cell. 2506 */ 2507 public static byte[] generateData(final Random r, int length) { 2508 byte[] b = new byte[length]; 2509 int i; 2510 2511 for (i = 0; i < (length - 8); i += 8) { 2512 b[i] = (byte) (65 + r.nextInt(26)); 2513 b[i + 1] = b[i]; 2514 b[i + 2] = b[i]; 2515 b[i + 3] = b[i]; 2516 b[i + 4] = b[i]; 2517 b[i + 5] = b[i]; 2518 b[i + 6] = b[i]; 2519 b[i + 7] = b[i]; 2520 } 2521 2522 byte a = (byte) (65 + r.nextInt(26)); 2523 for (; i < length; i++) { 2524 b[i] = a; 2525 } 2526 return b; 2527 } 2528 2529 static byte[] getRandomRow(final Random random, final int totalRows) { 2530 return format(generateRandomRow(random, totalRows)); 2531 } 2532 2533 static int generateRandomRow(final Random random, final int totalRows) { 2534 return random.nextInt(Integer.MAX_VALUE) % totalRows; 2535 } 2536 2537 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf, 2538 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status) 2539 throws IOException, InterruptedException { 2540 status.setStatus( 2541 "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows"); 2542 long totalElapsedTime; 2543 2544 final TestBase t; 2545 try { 2546 if (AsyncTest.class.isAssignableFrom(cmd)) { 2547 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd; 2548 Constructor<? extends AsyncTest> constructor = 2549 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class); 2550 t = constructor.newInstance(asyncCon, opts, status); 2551 } else { 2552 Class<? extends Test> newCmd = (Class<? extends Test>) cmd; 2553 Constructor<? extends Test> constructor = 2554 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class); 2555 t = constructor.newInstance(con, opts, status); 2556 } 2557 } catch (NoSuchMethodException e) { 2558 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 2559 + ". It does not provide a constructor as described by " 2560 + "the javadoc comment. Available constructors are: " 2561 + Arrays.toString(cmd.getConstructors())); 2562 } catch (Exception e) { 2563 throw new IllegalStateException("Failed to construct command class", e); 2564 } 2565 totalElapsedTime = t.test(); 2566 2567 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow 2568 + " for " + opts.perClientRunRows + " rows" + " (" 2569 + calculateMbps((int) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime, 2570 getAverageValueLength(opts), opts.families, opts.columns) 2571 + ")"); 2572 2573 return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold, 2574 t.numOfReplyFromReplica, t.getLatencyHistogram()); 2575 } 2576 2577 private static int getAverageValueLength(final TestOptions opts) { 2578 return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize; 2579 } 2580 2581 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 2582 throws IOException, InterruptedException, ClassNotFoundException, ExecutionException { 2583 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do 2584 // the TestOptions introspection for us and dump the output in a readable format. 2585 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts)); 2586 Admin admin = null; 2587 Connection connection = null; 2588 try { 2589 connection = ConnectionFactory.createConnection(getConf()); 2590 admin = connection.getAdmin(); 2591 checkTable(admin, opts); 2592 } finally { 2593 if (admin != null) admin.close(); 2594 if (connection != null) connection.close(); 2595 } 2596 if (opts.nomapred) { 2597 doLocalClients(opts, getConf()); 2598 } else { 2599 doMapReduce(opts, getConf()); 2600 } 2601 } 2602 2603 protected void printUsage() { 2604 printUsage(PE_COMMAND_SHORTNAME, null); 2605 } 2606 2607 protected static void printUsage(final String message) { 2608 printUsage(PE_COMMAND_SHORTNAME, message); 2609 } 2610 2611 protected static void printUsageAndExit(final String message, final int exitCode) { 2612 printUsage(message); 2613 System.exit(exitCode); 2614 } 2615 2616 protected static void printUsage(final String shortName, final String message) { 2617 if (message != null && message.length() > 0) { 2618 System.err.println(message); 2619 } 2620 System.err.print("Usage: hbase " + shortName); 2621 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>"); 2622 System.err.println(); 2623 System.err.println("General Options:"); 2624 System.err.println( 2625 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 2626 System.err 2627 .println(" oneCon all the threads share the same connection. Default: False"); 2628 System.err.println(" connCount connections all threads share. " 2629 + "For example, if set to 2, then all thread share 2 connection. " 2630 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, " 2631 + "if not, connCount=thread number"); 2632 2633 System.err.println(" sampleRate Execute test on a sample of total " 2634 + "rows. Only supported by randomRead. Default: 1.0"); 2635 System.err.println(" period Report every 'period' rows: " 2636 + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10); 2637 System.err.println(" cycles How many times to cycle the test. Defaults: 1."); 2638 System.err.println( 2639 " traceRate Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0"); 2640 System.err.println(" latency Set to report operation latencies. Default: False"); 2641 System.err.println(" latencyThreshold Set to report number of operations with latency " 2642 + "over lantencyThreshold, unit in millisecond, default 0"); 2643 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" 2644 + " rows have been treated. Default: 0"); 2645 System.err 2646 .println(" valueSize Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize()); 2647 System.err.println(" valueRandom Set if we should vary value size between 0 and " 2648 + "'valueSize'; set on read for stats on size: Default: Not set."); 2649 System.err.println(" blockEncoding Block encoding to use. Value should be one of " 2650 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE"); 2651 System.err.println(); 2652 System.err.println("Table Creation / Write Tests:"); 2653 System.err.println(" table Alternate table name. Default: 'TestTable'"); 2654 System.err.println( 2655 " rows Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows() 2656 + ". In case of randomReads and randomSeekScans this could" 2657 + " be specified along with --size to specify the number of rows to be scanned within" 2658 + " the total range specified by the size."); 2659 System.err.println( 2660 " size Total size in GiB. Mutually exclusive with --rows for writes and scans" 2661 + ". But for randomReads and randomSeekScans when you use size with --rows you could" 2662 + " use size to specify the end range and --rows" 2663 + " specifies the number of rows within that range. " + "Default: 1.0."); 2664 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 2665 System.err.println(" encryption Encryption type to use (AES, ...). Default: 'NONE'"); 2666 System.err.println( 2667 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 2668 System.err.println(" valueZipf Set if we should vary value size between 0 and " 2669 + "'valueSize' in zipf form: Default: Not set."); 2670 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 2671 System.err.println(" autoFlush Set autoFlush on htable. Default: False"); 2672 System.err.println(" multiPut Batch puts together into groups of N. Only supported " 2673 + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); 2674 System.err.println(" presplit Create presplit table. If a table with same name exists," 2675 + " it'll be deleted and recreated (instead of verifying count of its existing regions). " 2676 + "Recommended for accurate perf analysis (see guide). Default: disabled"); 2677 System.err.println( 2678 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default: false"); 2679 System.err.println(" numoftags Specify the no of tags that would be needed. " 2680 + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags); 2681 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); 2682 System.err.println(" columns Columns to write per row. Default: 1"); 2683 System.err 2684 .println(" families Specify number of column families for the table. Default: 1"); 2685 System.err.println(); 2686 System.err.println("Read Tests:"); 2687 System.err.println(" filterAll Helps to filter out all the rows on the server side" 2688 + " there by not returning any thing back to the client. Helps to check the server side" 2689 + " performance. Uses FilterAllFilter internally. "); 2690 System.err.println(" multiGet Batch gets together into groups of N. Only supported " 2691 + "by randomRead. Default: disabled"); 2692 System.err.println(" inmemory Tries to keep the HFiles of the CF " 2693 + "inmemory as far as possible. Not guaranteed that reads are always served " 2694 + "from memory. Default: false"); 2695 System.err 2696 .println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values())); 2697 System.err.println(" blockSize Blocksize to use when writing out hfiles. "); 2698 System.err 2699 .println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. " 2700 + "Uses the CompactingMemstore"); 2701 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true"); 2702 System.err.println(" replicas Enable region replica testing. Defaults: 1."); 2703 System.err.println( 2704 " randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); 2705 System.err.println(" caching Scan caching to use. Default: 30"); 2706 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan"); 2707 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true"); 2708 System.err.println( 2709 " scanReadType Set the readType option for scan, stream/pread/default. Default: default"); 2710 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB"); 2711 System.err.println(); 2712 System.err.println(" Note: -D properties will be applied to the conf used. "); 2713 System.err.println(" For example: "); 2714 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 2715 System.err.println(" -Dmapreduce.task.timeout=60000"); 2716 System.err.println(); 2717 System.err.println("Command:"); 2718 for (CmdDescriptor command : COMMANDS.values()) { 2719 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription())); 2720 } 2721 System.err.println(); 2722 System.err.println("Args:"); 2723 System.err.println(" nclients Integer. Required. Total number of clients " 2724 + "(and HRegionServers) running. 1 <= value <= 500"); 2725 System.err.println("Examples:"); 2726 System.err.println(" To run a single client doing the default 1M sequentialWrites:"); 2727 System.err.println(" $ hbase " + shortName + " sequentialWrite 1"); 2728 System.err.println(" To run 10 clients doing increments over ten rows:"); 2729 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10"); 2730 } 2731 2732 /** 2733 * Parse options passed in via an arguments array. Assumes that array has been split on 2734 * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at 2735 * the conclusion of this method call. It's up to the caller to deal with these unrecognized 2736 * arguments. 2737 */ 2738 static TestOptions parseOpts(Queue<String> args) { 2739 TestOptions opts = new TestOptions(); 2740 2741 String cmd = null; 2742 while ((cmd = args.poll()) != null) { 2743 if (cmd.equals("-h") || cmd.startsWith("--h")) { 2744 // place item back onto queue so that caller knows parsing was incomplete 2745 args.add(cmd); 2746 break; 2747 } 2748 2749 final String nmr = "--nomapred"; 2750 if (cmd.startsWith(nmr)) { 2751 opts.nomapred = true; 2752 continue; 2753 } 2754 2755 final String rows = "--rows="; 2756 if (cmd.startsWith(rows)) { 2757 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); 2758 continue; 2759 } 2760 2761 final String cycles = "--cycles="; 2762 if (cmd.startsWith(cycles)) { 2763 opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); 2764 continue; 2765 } 2766 2767 final String sampleRate = "--sampleRate="; 2768 if (cmd.startsWith(sampleRate)) { 2769 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); 2770 continue; 2771 } 2772 2773 final String table = "--table="; 2774 if (cmd.startsWith(table)) { 2775 opts.tableName = cmd.substring(table.length()); 2776 continue; 2777 } 2778 2779 final String startRow = "--startRow="; 2780 if (cmd.startsWith(startRow)) { 2781 opts.startRow = Integer.parseInt(cmd.substring(startRow.length())); 2782 continue; 2783 } 2784 2785 final String compress = "--compress="; 2786 if (cmd.startsWith(compress)) { 2787 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 2788 continue; 2789 } 2790 2791 final String encryption = "--encryption="; 2792 if (cmd.startsWith(encryption)) { 2793 opts.encryption = cmd.substring(encryption.length()); 2794 continue; 2795 } 2796 2797 final String traceRate = "--traceRate="; 2798 if (cmd.startsWith(traceRate)) { 2799 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length())); 2800 continue; 2801 } 2802 2803 final String blockEncoding = "--blockEncoding="; 2804 if (cmd.startsWith(blockEncoding)) { 2805 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 2806 continue; 2807 } 2808 2809 final String flushCommits = "--flushCommits="; 2810 if (cmd.startsWith(flushCommits)) { 2811 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 2812 continue; 2813 } 2814 2815 final String writeToWAL = "--writeToWAL="; 2816 if (cmd.startsWith(writeToWAL)) { 2817 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 2818 continue; 2819 } 2820 2821 final String presplit = "--presplit="; 2822 if (cmd.startsWith(presplit)) { 2823 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 2824 continue; 2825 } 2826 2827 final String inMemory = "--inmemory="; 2828 if (cmd.startsWith(inMemory)) { 2829 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 2830 continue; 2831 } 2832 2833 final String autoFlush = "--autoFlush="; 2834 if (cmd.startsWith(autoFlush)) { 2835 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); 2836 continue; 2837 } 2838 2839 final String onceCon = "--oneCon="; 2840 if (cmd.startsWith(onceCon)) { 2841 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); 2842 continue; 2843 } 2844 2845 final String connCount = "--connCount="; 2846 if (cmd.startsWith(connCount)) { 2847 opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); 2848 continue; 2849 } 2850 2851 final String latencyThreshold = "--latencyThreshold="; 2852 if (cmd.startsWith(latencyThreshold)) { 2853 opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length())); 2854 continue; 2855 } 2856 2857 final String latency = "--latency"; 2858 if (cmd.startsWith(latency)) { 2859 opts.reportLatency = true; 2860 continue; 2861 } 2862 2863 final String multiGet = "--multiGet="; 2864 if (cmd.startsWith(multiGet)) { 2865 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); 2866 continue; 2867 } 2868 2869 final String multiPut = "--multiPut="; 2870 if (cmd.startsWith(multiPut)) { 2871 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); 2872 continue; 2873 } 2874 2875 final String useTags = "--usetags="; 2876 if (cmd.startsWith(useTags)) { 2877 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 2878 continue; 2879 } 2880 2881 final String noOfTags = "--numoftags="; 2882 if (cmd.startsWith(noOfTags)) { 2883 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 2884 continue; 2885 } 2886 2887 final String replicas = "--replicas="; 2888 if (cmd.startsWith(replicas)) { 2889 opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); 2890 continue; 2891 } 2892 2893 final String filterOutAll = "--filterAll"; 2894 if (cmd.startsWith(filterOutAll)) { 2895 opts.filterAll = true; 2896 continue; 2897 } 2898 2899 final String size = "--size="; 2900 if (cmd.startsWith(size)) { 2901 opts.size = Float.parseFloat(cmd.substring(size.length())); 2902 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); 2903 continue; 2904 } 2905 2906 final String splitPolicy = "--splitPolicy="; 2907 if (cmd.startsWith(splitPolicy)) { 2908 opts.splitPolicy = cmd.substring(splitPolicy.length()); 2909 continue; 2910 } 2911 2912 final String randomSleep = "--randomSleep="; 2913 if (cmd.startsWith(randomSleep)) { 2914 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length())); 2915 continue; 2916 } 2917 2918 final String measureAfter = "--measureAfter="; 2919 if (cmd.startsWith(measureAfter)) { 2920 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length())); 2921 continue; 2922 } 2923 2924 final String bloomFilter = "--bloomFilter="; 2925 if (cmd.startsWith(bloomFilter)) { 2926 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length())); 2927 continue; 2928 } 2929 2930 final String blockSize = "--blockSize="; 2931 if (cmd.startsWith(blockSize)) { 2932 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length())); 2933 continue; 2934 } 2935 2936 final String valueSize = "--valueSize="; 2937 if (cmd.startsWith(valueSize)) { 2938 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length())); 2939 continue; 2940 } 2941 2942 final String valueRandom = "--valueRandom"; 2943 if (cmd.startsWith(valueRandom)) { 2944 opts.valueRandom = true; 2945 continue; 2946 } 2947 2948 final String valueZipf = "--valueZipf"; 2949 if (cmd.startsWith(valueZipf)) { 2950 opts.valueZipf = true; 2951 continue; 2952 } 2953 2954 final String period = "--period="; 2955 if (cmd.startsWith(period)) { 2956 opts.period = Integer.parseInt(cmd.substring(period.length())); 2957 continue; 2958 } 2959 2960 final String addColumns = "--addColumns="; 2961 if (cmd.startsWith(addColumns)) { 2962 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length())); 2963 continue; 2964 } 2965 2966 final String inMemoryCompaction = "--inmemoryCompaction="; 2967 if (cmd.startsWith(inMemoryCompaction)) { 2968 opts.inMemoryCompaction = 2969 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length())); 2970 continue; 2971 } 2972 2973 final String columns = "--columns="; 2974 if (cmd.startsWith(columns)) { 2975 opts.columns = Integer.parseInt(cmd.substring(columns.length())); 2976 continue; 2977 } 2978 2979 final String families = "--families="; 2980 if (cmd.startsWith(families)) { 2981 opts.families = Integer.parseInt(cmd.substring(families.length())); 2982 continue; 2983 } 2984 2985 final String caching = "--caching="; 2986 if (cmd.startsWith(caching)) { 2987 opts.caching = Integer.parseInt(cmd.substring(caching.length())); 2988 continue; 2989 } 2990 2991 final String asyncPrefetch = "--asyncPrefetch"; 2992 if (cmd.startsWith(asyncPrefetch)) { 2993 opts.asyncPrefetch = true; 2994 continue; 2995 } 2996 2997 final String cacheBlocks = "--cacheBlocks="; 2998 if (cmd.startsWith(cacheBlocks)) { 2999 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length())); 3000 continue; 3001 } 3002 3003 final String scanReadType = "--scanReadType="; 3004 if (cmd.startsWith(scanReadType)) { 3005 opts.scanReadType = 3006 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase()); 3007 continue; 3008 } 3009 3010 final String bufferSize = "--bufferSize="; 3011 if (cmd.startsWith(bufferSize)) { 3012 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length())); 3013 continue; 3014 } 3015 3016 validateParsedOpts(opts); 3017 3018 if (isCommandClass(cmd)) { 3019 opts.cmdName = cmd; 3020 try { 3021 opts.numClientThreads = Integer.parseInt(args.remove()); 3022 } catch (NoSuchElementException | NumberFormatException e) { 3023 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e); 3024 } 3025 opts = calculateRowsAndSize(opts); 3026 break; 3027 } else { 3028 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 3029 } 3030 3031 // Not matching any option or command. 3032 System.err.println("Error: Wrong option or command: " + cmd); 3033 args.add(cmd); 3034 break; 3035 } 3036 return opts; 3037 } 3038 3039 /** 3040 * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts 3041 */ 3042 private static void validateParsedOpts(TestOptions opts) { 3043 3044 if (!opts.autoFlush && opts.multiPut > 0) { 3045 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0"); 3046 } 3047 3048 if (opts.oneCon && opts.connCount > 1) { 3049 throw new IllegalArgumentException( 3050 "oneCon is set to true, " + "connCount should not bigger than 1"); 3051 } 3052 3053 if (opts.valueZipf && opts.valueRandom) { 3054 throw new IllegalStateException("Either valueZipf or valueRandom but not both"); 3055 } 3056 } 3057 3058 static TestOptions calculateRowsAndSize(final TestOptions opts) { 3059 int rowsPerGB = getRowsPerGB(opts); 3060 if ( 3061 (opts.getCmdName() != null 3062 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN))) 3063 && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows 3064 ) { 3065 opts.totalRows = (int) opts.size * rowsPerGB; 3066 } else if (opts.size != DEFAULT_OPTS.size) { 3067 // total size in GB specified 3068 opts.totalRows = (int) opts.size * rowsPerGB; 3069 opts.perClientRunRows = opts.totalRows / opts.numClientThreads; 3070 } else { 3071 opts.totalRows = opts.perClientRunRows * opts.numClientThreads; 3072 opts.size = opts.totalRows / rowsPerGB; 3073 } 3074 return opts; 3075 } 3076 3077 static int getRowsPerGB(final TestOptions opts) { 3078 return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies() 3079 * opts.getColumns()); 3080 } 3081 3082 @Override 3083 public int run(String[] args) throws Exception { 3084 // Process command-line args. TODO: Better cmd-line processing 3085 // (but hopefully something not as painful as cli options). 3086 int errCode = -1; 3087 if (args.length < 1) { 3088 printUsage(); 3089 return errCode; 3090 } 3091 3092 try { 3093 LinkedList<String> argv = new LinkedList<>(); 3094 argv.addAll(Arrays.asList(args)); 3095 TestOptions opts = parseOpts(argv); 3096 3097 // args remaining, print help and exit 3098 if (!argv.isEmpty()) { 3099 errCode = 0; 3100 printUsage(); 3101 return errCode; 3102 } 3103 3104 // must run at least 1 client 3105 if (opts.numClientThreads <= 0) { 3106 throw new IllegalArgumentException("Number of clients must be > 0"); 3107 } 3108 3109 // cmdName should not be null, print help and exit 3110 if (opts.cmdName == null) { 3111 printUsage(); 3112 return errCode; 3113 } 3114 3115 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName); 3116 if (cmdClass != null) { 3117 runTest(cmdClass, opts); 3118 errCode = 0; 3119 } 3120 3121 } catch (Exception e) { 3122 e.printStackTrace(); 3123 } 3124 3125 return errCode; 3126 } 3127 3128 private static boolean isCommandClass(String cmd) { 3129 return COMMANDS.containsKey(cmd); 3130 } 3131 3132 private static Class<? extends TestBase> determineCommandClass(String cmd) { 3133 CmdDescriptor descriptor = COMMANDS.get(cmd); 3134 return descriptor != null ? descriptor.getCmdClass() : null; 3135 } 3136 3137 public static void main(final String[] args) throws Exception { 3138 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 3139 System.exit(res); 3140 } 3141}