001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import com.codahale.metrics.Histogram;
021import com.codahale.metrics.UniformReservoir;
022import io.opentelemetry.api.trace.Span;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.io.PrintStream;
026import java.lang.reflect.Constructor;
027import java.math.BigDecimal;
028import java.math.MathContext;
029import java.text.DecimalFormat;
030import java.text.SimpleDateFormat;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Date;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Locale;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.Properties;
040import java.util.Queue;
041import java.util.Random;
042import java.util.TreeMap;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.Future;
048import java.util.concurrent.ThreadLocalRandom;
049import org.apache.commons.lang3.StringUtils;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.conf.Configured;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.Append;
056import org.apache.hadoop.hbase.client.AsyncConnection;
057import org.apache.hadoop.hbase.client.AsyncTable;
058import org.apache.hadoop.hbase.client.BufferedMutator;
059import org.apache.hadoop.hbase.client.BufferedMutatorParams;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
061import org.apache.hadoop.hbase.client.Connection;
062import org.apache.hadoop.hbase.client.ConnectionFactory;
063import org.apache.hadoop.hbase.client.Consistency;
064import org.apache.hadoop.hbase.client.Delete;
065import org.apache.hadoop.hbase.client.Durability;
066import org.apache.hadoop.hbase.client.Get;
067import org.apache.hadoop.hbase.client.Increment;
068import org.apache.hadoop.hbase.client.Put;
069import org.apache.hadoop.hbase.client.RegionInfo;
070import org.apache.hadoop.hbase.client.RegionInfoBuilder;
071import org.apache.hadoop.hbase.client.RegionLocator;
072import org.apache.hadoop.hbase.client.Result;
073import org.apache.hadoop.hbase.client.ResultScanner;
074import org.apache.hadoop.hbase.client.RowMutations;
075import org.apache.hadoop.hbase.client.Scan;
076import org.apache.hadoop.hbase.client.Table;
077import org.apache.hadoop.hbase.client.TableDescriptor;
078import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
079import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
080import org.apache.hadoop.hbase.filter.BinaryComparator;
081import org.apache.hadoop.hbase.filter.Filter;
082import org.apache.hadoop.hbase.filter.FilterAllFilter;
083import org.apache.hadoop.hbase.filter.FilterList;
084import org.apache.hadoop.hbase.filter.PageFilter;
085import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
086import org.apache.hadoop.hbase.filter.WhileMatchFilter;
087import org.apache.hadoop.hbase.io.compress.Compression;
088import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
089import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
090import org.apache.hadoop.hbase.regionserver.BloomType;
091import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
092import org.apache.hadoop.hbase.trace.TraceUtil;
093import org.apache.hadoop.hbase.util.ByteArrayHashKey;
094import org.apache.hadoop.hbase.util.Bytes;
095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
096import org.apache.hadoop.hbase.util.GsonUtil;
097import org.apache.hadoop.hbase.util.Hash;
098import org.apache.hadoop.hbase.util.MurmurHash;
099import org.apache.hadoop.hbase.util.Pair;
100import org.apache.hadoop.hbase.util.RandomDistribution;
101import org.apache.hadoop.hbase.util.YammerHistogramUtils;
102import org.apache.hadoop.io.LongWritable;
103import org.apache.hadoop.io.Text;
104import org.apache.hadoop.mapreduce.Job;
105import org.apache.hadoop.mapreduce.Mapper;
106import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
107import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
108import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
109import org.apache.hadoop.util.Tool;
110import org.apache.hadoop.util.ToolRunner;
111import org.apache.yetus.audience.InterfaceAudience;
112import org.slf4j.Logger;
113import org.slf4j.LoggerFactory;
114
115import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
116import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
117import org.apache.hbase.thirdparty.com.google.gson.Gson;
118
119/**
120 * Script used evaluating HBase performance and scalability. Runs a HBase client that steps through
121 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test,
122 * etc.). Pass on the command-line which test to run and how many clients are participating in this
123 * experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
124 * <p>
125 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance
126 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper,
127 * pages 8-10.
128 * <p>
129 * By default, runs as a mapreduce job where each mapper runs a single test client. Can also run as
130 * a non-mapreduce, multithreaded application by specifying {@code --nomapred}. Each client does
131 * about 1GB of data, unless specified otherwise.
132 */
133@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
134public class PerformanceEvaluation extends Configured implements Tool {
135  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
136  static final String RANDOM_READ = "randomRead";
137  static final String PE_COMMAND_SHORTNAME = "pe";
138  private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
139  private static final Gson GSON = GsonUtil.createGson().create();
140
141  public static final String TABLE_NAME = "TestTable";
142  public static final String FAMILY_NAME_BASE = "info";
143  public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
144  public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
145  public static final int DEFAULT_VALUE_LENGTH = 1000;
146  public static final int ROW_LENGTH = 26;
147
148  private static final int ONE_GB = 1024 * 1024 * 1000;
149  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
150  // TODO : should we make this configurable
151  private static final int TAG_LENGTH = 256;
152  private static final DecimalFormat FMT = new DecimalFormat("0.##");
153  private static final MathContext CXT = MathContext.DECIMAL64;
154  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
155  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
156  private static final TestOptions DEFAULT_OPTS = new TestOptions();
157
158  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
159  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
160
161  static {
162    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
163      "Run async random read test");
164    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
165      "Run async random write test");
166    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
167      "Run async sequential read test");
168    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
169      "Run async sequential write test");
170    addCommandDescriptor(AsyncScanTest.class, "asyncScan", "Run async scan test (read every row)");
171    addCommandDescriptor(RandomReadTest.class, RANDOM_READ, "Run random read test");
172    addCommandDescriptor(MetaRandomReadTest.class, "metaRandomRead", "Run getRegionLocation test");
173    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
174      "Run random seek and scan 100 test");
175    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
176      "Run random seek scan with both start and stop row (max 10 rows)");
177    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
178      "Run random seek scan with both start and stop row (max 100 rows)");
179    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
180      "Run random seek scan with both start and stop row (max 1000 rows)");
181    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
182      "Run random seek scan with both start and stop row (max 10000 rows)");
183    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
184    addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test");
185    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
186    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
187    addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete",
188      "Run sequential delete test");
189    addCommandDescriptor(MetaWriteTest.class, "metaWrite",
190      "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
191    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
192    addCommandDescriptor(ReverseScanTest.class, "reverseScan",
193      "Run reverse scan test (read every row)");
194    addCommandDescriptor(FilteredScanTest.class, "filterScan",
195      "Run scan test using a filter to find a specific row based on it's value "
196        + "(make sure to use --rows=20)");
197    addCommandDescriptor(IncrementTest.class, "increment",
198      "Increment on each row; clients overlap on keyspace so some concurrent operations");
199    addCommandDescriptor(AppendTest.class, "append",
200      "Append on each row; clients overlap on keyspace so some concurrent operations");
201    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
202      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
203    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
204      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
205    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
206      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
207    addCommandDescriptor(CleanMetaTest.class, "cleanMeta",
208      "Remove fake region entries on meta table inserted by metaWrite; used with 1 thread");
209  }
210
211  /**
212   * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find
213   * associated properties.
214   */
215  protected static enum Counter {
216    /** elapsed time */
217    ELAPSED_TIME,
218    /** number of rows */
219    ROWS
220  }
221
222  protected static class RunResult implements Comparable<RunResult> {
223    public RunResult(long duration, Histogram hist) {
224      this.duration = duration;
225      this.hist = hist;
226      numbOfReplyOverThreshold = 0;
227      numOfReplyFromReplica = 0;
228    }
229
230    public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
231      Histogram hist) {
232      this.duration = duration;
233      this.hist = hist;
234      this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
235      this.numOfReplyFromReplica = numOfReplyFromReplica;
236    }
237
238    public final long duration;
239    public final Histogram hist;
240    public final long numbOfReplyOverThreshold;
241    public final long numOfReplyFromReplica;
242
243    @Override
244    public String toString() {
245      return Long.toString(duration);
246    }
247
248    @Override
249    public int compareTo(RunResult o) {
250      return Long.compare(this.duration, o.duration);
251    }
252
253    @Override
254    public boolean equals(Object obj) {
255      if (this == obj) {
256        return true;
257      }
258      if (obj == null || getClass() != obj.getClass()) {
259        return false;
260      }
261      return this.compareTo((RunResult) obj) == 0;
262    }
263
264    @Override
265    public int hashCode() {
266      return Long.hashCode(duration);
267    }
268  }
269
270  /**
271   * Constructor
272   * @param conf Configuration object
273   */
274  public PerformanceEvaluation(final Configuration conf) {
275    super(conf);
276  }
277
278  protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass, String name,
279    String description) {
280    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
281    COMMANDS.put(name, cmdDescriptor);
282  }
283
284  /**
285   * Implementations can have their status set.
286   */
287  interface Status {
288    /**
289     * Sets status
290     * @param msg status message
291     */
292    void setStatus(final String msg) throws IOException;
293  }
294
295  /**
296   * MapReduce job that runs a performance evaluation client in each map task.
297   */
298  public static class EvaluationMapTask
299    extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
300
301    /** configuration parameter name that contains the command */
302    public final static String CMD_KEY = "EvaluationMapTask.command";
303    /** configuration parameter name that contains the PE impl */
304    public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
305
306    private Class<? extends Test> cmd;
307
308    @Override
309    protected void setup(Context context) throws IOException, InterruptedException {
310      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
311
312      // this is required so that extensions of PE are instantiated within the
313      // map reduce task...
314      Class<? extends PerformanceEvaluation> peClass =
315        forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
316      try {
317        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
318      } catch (Exception e) {
319        throw new IllegalStateException("Could not instantiate PE instance", e);
320      }
321    }
322
323    private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
324      try {
325        return Class.forName(className).asSubclass(type);
326      } catch (ClassNotFoundException e) {
327        throw new IllegalStateException("Could not find class for name: " + className, e);
328      }
329    }
330
331    @Override
332    protected void map(LongWritable key, Text value, final Context context)
333      throws IOException, InterruptedException {
334
335      Status status = new Status() {
336        @Override
337        public void setStatus(String msg) {
338          context.setStatus(msg);
339        }
340      };
341
342      TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
343      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
344      final Connection con = ConnectionFactory.createConnection(conf);
345      AsyncConnection asyncCon = null;
346      try {
347        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
348      } catch (ExecutionException e) {
349        throw new IOException(e);
350      }
351
352      // Evaluation task
353      RunResult result =
354        PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
355      // Collect how much time the thing took. Report as map output and
356      // to the ELAPSED_TIME counter.
357      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
358      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
359      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
360      context.progress();
361    }
362  }
363
364  /*
365   * If table does not already exist, create. Also create a table when {@code opts.presplitRegions}
366   * is specified or when the existing table's region replica count doesn't match {@code
367   * opts.replicas}.
368   */
369  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
370    TableName tableName = TableName.valueOf(opts.tableName);
371    boolean needsDelete = false, exists = admin.tableExists(tableName);
372    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
373      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
374    boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete");
375    if (!exists && (isReadCmd || isDeleteCmd)) {
376      throw new IllegalStateException(
377        "Must specify an existing table for read commands. Run a write command first.");
378    }
379    TableDescriptor desc = exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
380    byte[][] splits = getSplits(opts);
381
382    // recreate the table when user has requested presplit or when existing
383    // {RegionSplitPolicy,replica count} does not match requested, or when the
384    // number of column families does not match requested.
385    if (
386      (exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions
387        && opts.presplitRegions != admin.getRegions(tableName).size())
388        || (!isReadCmd && desc != null
389          && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
390        || (!(isReadCmd || isDeleteCmd) && desc != null
391          && desc.getRegionReplication() != opts.replicas)
392        || (desc != null && desc.getColumnFamilyCount() != opts.families)
393    ) {
394      needsDelete = true;
395      // wait, why did it delete my table?!?
396      LOG.debug(MoreObjects.toStringHelper("needsDelete").add("needsDelete", needsDelete)
397        .add("isReadCmd", isReadCmd).add("exists", exists).add("desc", desc)
398        .add("presplit", opts.presplitRegions).add("splitPolicy", opts.splitPolicy)
399        .add("replicas", opts.replicas).add("families", opts.families).toString());
400    }
401
402    // remove an existing table
403    if (needsDelete) {
404      if (admin.isTableEnabled(tableName)) {
405        admin.disableTable(tableName);
406      }
407      admin.deleteTable(tableName);
408    }
409
410    // table creation is necessary
411    if (!exists || needsDelete) {
412      desc = getTableDescriptor(opts);
413      if (splits != null) {
414        if (LOG.isDebugEnabled()) {
415          for (int i = 0; i < splits.length; i++) {
416            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
417          }
418        }
419      }
420      if (splits != null) {
421        admin.createTable(desc, splits);
422      } else {
423        admin.createTable(desc);
424      }
425      LOG.info("Table " + desc + " created");
426    }
427    return admin.tableExists(tableName);
428  }
429
430  /**
431   * Create an HTableDescriptor from provided TestOptions.
432   */
433  protected static TableDescriptor getTableDescriptor(TestOptions opts) {
434    TableDescriptorBuilder builder =
435      TableDescriptorBuilder.newBuilder(TableName.valueOf(opts.tableName));
436
437    for (int family = 0; family < opts.families; family++) {
438      byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
439      ColumnFamilyDescriptorBuilder cfBuilder =
440        ColumnFamilyDescriptorBuilder.newBuilder(familyName);
441      cfBuilder.setDataBlockEncoding(opts.blockEncoding);
442      cfBuilder.setCompressionType(opts.compression);
443      cfBuilder.setEncryptionType(opts.encryption);
444      cfBuilder.setBloomFilterType(opts.bloomType);
445      cfBuilder.setBlocksize(opts.blockSize);
446      if (opts.inMemoryCF) {
447        cfBuilder.setInMemory(true);
448      }
449      cfBuilder.setInMemoryCompaction(opts.inMemoryCompaction);
450      builder.setColumnFamily(cfBuilder.build());
451    }
452    if (opts.replicas != DEFAULT_OPTS.replicas) {
453      builder.setRegionReplication(opts.replicas);
454    }
455    if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
456      builder.setRegionSplitPolicyClassName(opts.splitPolicy);
457    }
458    return builder.build();
459  }
460
461  /**
462   * generates splits based on total number of rows and specified split regions
463   */
464  protected static byte[][] getSplits(TestOptions opts) {
465    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) return null;
466
467    int numSplitPoints = opts.presplitRegions - 1;
468    byte[][] splits = new byte[numSplitPoints][];
469    long jump = opts.totalRows / opts.presplitRegions;
470    for (int i = 0; i < numSplitPoints; i++) {
471      long rowkey = jump * (1 + i);
472      splits[i] = format(rowkey);
473    }
474    return splits;
475  }
476
477  static void setupConnectionCount(final TestOptions opts) {
478    if (opts.oneCon) {
479      opts.connCount = 1;
480    } else {
481      if (opts.connCount == -1) {
482        // set to thread number if connCount is not set
483        opts.connCount = opts.numClientThreads;
484      }
485    }
486  }
487
488  /*
489   * Run all clients in this vm each to its own thread.
490   */
491  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
492    throws IOException, InterruptedException, ExecutionException {
493    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
494    assert cmd != null;
495    @SuppressWarnings("unchecked")
496    Future<RunResult>[] threads = new Future[opts.numClientThreads];
497    RunResult[] results = new RunResult[opts.numClientThreads];
498    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
499      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
500    setupConnectionCount(opts);
501    final Connection[] cons = new Connection[opts.connCount];
502    final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
503    for (int i = 0; i < opts.connCount; i++) {
504      cons[i] = ConnectionFactory.createConnection(conf);
505      asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
506    }
507    LOG
508      .info("Created " + opts.connCount + " connections for " + opts.numClientThreads + " threads");
509    for (int i = 0; i < threads.length; i++) {
510      final int index = i;
511      threads[i] = pool.submit(new Callable<RunResult>() {
512        @Override
513        public RunResult call() throws Exception {
514          TestOptions threadOpts = new TestOptions(opts);
515          final Connection con = cons[index % cons.length];
516          final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
517          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
518          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
519            @Override
520            public void setStatus(final String msg) throws IOException {
521              LOG.info(msg);
522            }
523          });
524          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration
525            + "ms over " + threadOpts.perClientRunRows + " rows");
526          if (opts.latencyThreshold > 0) {
527            LOG.info("Number of replies over latency threshold " + opts.latencyThreshold
528              + "(ms) is " + run.numbOfReplyOverThreshold);
529          }
530          return run;
531        }
532      });
533    }
534    pool.shutdown();
535
536    for (int i = 0; i < threads.length; i++) {
537      try {
538        results[i] = threads[i].get();
539      } catch (ExecutionException e) {
540        throw new IOException(e.getCause());
541      }
542    }
543    final String test = cmd.getSimpleName();
544    LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(results));
545    Arrays.sort(results);
546    long total = 0;
547    float avgLatency = 0;
548    float avgTPS = 0;
549    long replicaWins = 0;
550    for (RunResult result : results) {
551      total += result.duration;
552      avgLatency += result.hist.getSnapshot().getMean();
553      avgTPS += opts.perClientRunRows * 1.0f / result.duration;
554      replicaWins += result.numOfReplyFromReplica;
555    }
556    avgTPS *= 1000; // ms to second
557    avgLatency = avgLatency / results.length;
558    LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: "
559      + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms");
560    LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
561    LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
562    if (opts.replicas > 1) {
563      LOG.info("[results from replica regions] " + replicaWins);
564    }
565
566    for (int i = 0; i < opts.connCount; i++) {
567      cons[i].close();
568      asyncCons[i].close();
569    }
570
571    return results;
572  }
573
574  /*
575   * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write
576   * out an input file with instruction per client regards which row they are to start on.
577   * @param cmd Command to run.
578   */
579  static Job doMapReduce(TestOptions opts, final Configuration conf)
580    throws IOException, InterruptedException, ClassNotFoundException {
581    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
582    assert cmd != null;
583    Path inputDir = writeInputFile(conf, opts);
584    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
585    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
586    Job job = Job.getInstance(conf);
587    job.setJarByClass(PerformanceEvaluation.class);
588    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
589
590    job.setInputFormatClass(NLineInputFormat.class);
591    NLineInputFormat.setInputPaths(job, inputDir);
592    // this is default, but be explicit about it just in case.
593    NLineInputFormat.setNumLinesPerSplit(job, 1);
594
595    job.setOutputKeyClass(LongWritable.class);
596    job.setOutputValueClass(LongWritable.class);
597
598    job.setMapperClass(EvaluationMapTask.class);
599    job.setReducerClass(LongSumReducer.class);
600
601    job.setNumReduceTasks(1);
602
603    job.setOutputFormatClass(TextOutputFormat.class);
604    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
605
606    TableMapReduceUtil.addDependencyJars(job);
607    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Histogram.class, // yammer
608                                                                                            // metrics
609      Gson.class, // gson
610      FilterAllFilter.class // hbase-server tests jar
611    );
612
613    TableMapReduceUtil.initCredentials(job);
614
615    job.waitForCompletion(true);
616    return job;
617  }
618
619  /**
620   * Each client has one mapper to do the work, and client do the resulting count in a map task.
621   */
622
623  static String JOB_INPUT_FILENAME = "input.txt";
624
625  /*
626   * Write input file of offsets-per-client for the mapreduce job.
627   * @param c Configuration
628   * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
629   */
630  static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
631    return writeInputFile(c, opts, new Path("."));
632  }
633
634  static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
635    throws IOException {
636    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
637    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
638    Path inputDir = new Path(jobdir, "inputs");
639
640    FileSystem fs = FileSystem.get(c);
641    fs.mkdirs(inputDir);
642
643    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
644    PrintStream out = new PrintStream(fs.create(inputFile));
645    // Make input random.
646    Map<Integer, String> m = new TreeMap<>();
647    Hash h = MurmurHash.getInstance();
648    long perClientRows = (opts.totalRows / opts.numClientThreads);
649    try {
650      for (int j = 0; j < opts.numClientThreads; j++) {
651        TestOptions next = new TestOptions(opts);
652        next.startRow = j * perClientRows;
653        next.perClientRunRows = perClientRows;
654        String s = GSON.toJson(next);
655        LOG.info("Client=" + j + ", input=" + s);
656        byte[] b = Bytes.toBytes(s);
657        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
658        m.put(hash, s);
659      }
660      for (Map.Entry<Integer, String> e : m.entrySet()) {
661        out.println(e.getValue());
662      }
663    } finally {
664      out.close();
665    }
666    return inputDir;
667  }
668
669  /**
670   * Describes a command.
671   */
672  static class CmdDescriptor {
673    private Class<? extends TestBase> cmdClass;
674    private String name;
675    private String description;
676
677    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
678      this.cmdClass = cmdClass;
679      this.name = name;
680      this.description = description;
681    }
682
683    public Class<? extends TestBase> getCmdClass() {
684      return cmdClass;
685    }
686
687    public String getName() {
688      return name;
689    }
690
691    public String getDescription() {
692      return description;
693    }
694  }
695
696  /**
697   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. This makes
698   * tracking all these arguments a little easier. NOTE: ADDING AN OPTION, you need to add a data
699   * member, a getter/setter (to make JSON serialization of this TestOptions class behave), and you
700   * need to add to the clone constructor below copying your new option from the 'that' to the
701   * 'this'. Look for 'clone' below.
702   */
703  static class TestOptions {
704    String cmdName = null;
705    boolean nomapred = false;
706    boolean filterAll = false;
707    long startRow = 0;
708    float size = 1.0f;
709    long perClientRunRows = DEFAULT_ROWS_PER_GB;
710    int numClientThreads = 1;
711    long totalRows = DEFAULT_ROWS_PER_GB;
712    int measureAfter = 0;
713    float sampleRate = 1.0f;
714    /**
715     * @deprecated Useless after switching to OpenTelemetry
716     */
717    @Deprecated
718    double traceRate = 0.0;
719    String tableName = TABLE_NAME;
720    boolean flushCommits = true;
721    boolean writeToWAL = true;
722    boolean autoFlush = false;
723    boolean oneCon = false;
724    int connCount = -1; // wil decide the actual num later
725    boolean useTags = false;
726    int noOfTags = 1;
727    boolean reportLatency = false;
728    int multiGet = 0;
729    int multiPut = 0;
730    int randomSleep = 0;
731    boolean inMemoryCF = false;
732    int presplitRegions = 0;
733    int replicas = TableDescriptorBuilder.DEFAULT_REGION_REPLICATION;
734    String splitPolicy = null;
735    Compression.Algorithm compression = Compression.Algorithm.NONE;
736    String encryption = null;
737    BloomType bloomType = BloomType.ROW;
738    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
739    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
740    boolean valueRandom = false;
741    boolean valueZipf = false;
742    int valueSize = DEFAULT_VALUE_LENGTH;
743    long period = (this.perClientRunRows / 10) == 0 ? perClientRunRows : perClientRunRows / 10;
744    int cycles = 1;
745    int columns = 1;
746    int families = 1;
747    int caching = 30;
748    int latencyThreshold = 0; // in millsecond
749    boolean addColumns = true;
750    MemoryCompactionPolicy inMemoryCompaction =
751      MemoryCompactionPolicy.valueOf(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
752    boolean asyncPrefetch = false;
753    boolean cacheBlocks = true;
754    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
755    long bufferSize = 2l * 1024l * 1024l;
756    Properties commandProperties;
757
758    public TestOptions() {
759    }
760
761    /**
762     * Clone constructor.
763     * @param that Object to copy from.
764     */
765    public TestOptions(TestOptions that) {
766      this.cmdName = that.cmdName;
767      this.cycles = that.cycles;
768      this.nomapred = that.nomapred;
769      this.startRow = that.startRow;
770      this.size = that.size;
771      this.perClientRunRows = that.perClientRunRows;
772      this.numClientThreads = that.numClientThreads;
773      this.totalRows = that.totalRows;
774      this.sampleRate = that.sampleRate;
775      this.traceRate = that.traceRate;
776      this.tableName = that.tableName;
777      this.flushCommits = that.flushCommits;
778      this.writeToWAL = that.writeToWAL;
779      this.autoFlush = that.autoFlush;
780      this.oneCon = that.oneCon;
781      this.connCount = that.connCount;
782      this.useTags = that.useTags;
783      this.noOfTags = that.noOfTags;
784      this.reportLatency = that.reportLatency;
785      this.latencyThreshold = that.latencyThreshold;
786      this.multiGet = that.multiGet;
787      this.multiPut = that.multiPut;
788      this.inMemoryCF = that.inMemoryCF;
789      this.presplitRegions = that.presplitRegions;
790      this.replicas = that.replicas;
791      this.splitPolicy = that.splitPolicy;
792      this.compression = that.compression;
793      this.encryption = that.encryption;
794      this.blockEncoding = that.blockEncoding;
795      this.filterAll = that.filterAll;
796      this.bloomType = that.bloomType;
797      this.blockSize = that.blockSize;
798      this.valueRandom = that.valueRandom;
799      this.valueZipf = that.valueZipf;
800      this.valueSize = that.valueSize;
801      this.period = that.period;
802      this.randomSleep = that.randomSleep;
803      this.measureAfter = that.measureAfter;
804      this.addColumns = that.addColumns;
805      this.columns = that.columns;
806      this.families = that.families;
807      this.caching = that.caching;
808      this.inMemoryCompaction = that.inMemoryCompaction;
809      this.asyncPrefetch = that.asyncPrefetch;
810      this.cacheBlocks = that.cacheBlocks;
811      this.scanReadType = that.scanReadType;
812      this.bufferSize = that.bufferSize;
813      this.commandProperties = that.commandProperties;
814    }
815
816    public Properties getCommandProperties() {
817      return commandProperties;
818    }
819
820    public int getCaching() {
821      return this.caching;
822    }
823
824    public void setCaching(final int caching) {
825      this.caching = caching;
826    }
827
828    public int getColumns() {
829      return this.columns;
830    }
831
832    public void setColumns(final int columns) {
833      this.columns = columns;
834    }
835
836    public int getFamilies() {
837      return this.families;
838    }
839
840    public void setFamilies(final int families) {
841      this.families = families;
842    }
843
844    public int getCycles() {
845      return this.cycles;
846    }
847
848    public void setCycles(final int cycles) {
849      this.cycles = cycles;
850    }
851
852    public boolean isValueZipf() {
853      return valueZipf;
854    }
855
856    public void setValueZipf(boolean valueZipf) {
857      this.valueZipf = valueZipf;
858    }
859
860    public String getCmdName() {
861      return cmdName;
862    }
863
864    public void setCmdName(String cmdName) {
865      this.cmdName = cmdName;
866    }
867
868    public int getRandomSleep() {
869      return randomSleep;
870    }
871
872    public void setRandomSleep(int randomSleep) {
873      this.randomSleep = randomSleep;
874    }
875
876    public int getReplicas() {
877      return replicas;
878    }
879
880    public void setReplicas(int replicas) {
881      this.replicas = replicas;
882    }
883
884    public String getSplitPolicy() {
885      return splitPolicy;
886    }
887
888    public void setSplitPolicy(String splitPolicy) {
889      this.splitPolicy = splitPolicy;
890    }
891
892    public void setNomapred(boolean nomapred) {
893      this.nomapred = nomapred;
894    }
895
896    public void setFilterAll(boolean filterAll) {
897      this.filterAll = filterAll;
898    }
899
900    public void setStartRow(long startRow) {
901      this.startRow = startRow;
902    }
903
904    public void setSize(float size) {
905      this.size = size;
906    }
907
908    public void setPerClientRunRows(int perClientRunRows) {
909      this.perClientRunRows = perClientRunRows;
910    }
911
912    public void setNumClientThreads(int numClientThreads) {
913      this.numClientThreads = numClientThreads;
914    }
915
916    public void setTotalRows(long totalRows) {
917      this.totalRows = totalRows;
918    }
919
920    public void setSampleRate(float sampleRate) {
921      this.sampleRate = sampleRate;
922    }
923
924    public void setTraceRate(double traceRate) {
925      this.traceRate = traceRate;
926    }
927
928    public void setTableName(String tableName) {
929      this.tableName = tableName;
930    }
931
932    public void setFlushCommits(boolean flushCommits) {
933      this.flushCommits = flushCommits;
934    }
935
936    public void setWriteToWAL(boolean writeToWAL) {
937      this.writeToWAL = writeToWAL;
938    }
939
940    public void setAutoFlush(boolean autoFlush) {
941      this.autoFlush = autoFlush;
942    }
943
944    public void setOneCon(boolean oneCon) {
945      this.oneCon = oneCon;
946    }
947
948    public int getConnCount() {
949      return connCount;
950    }
951
952    public void setConnCount(int connCount) {
953      this.connCount = connCount;
954    }
955
956    public void setUseTags(boolean useTags) {
957      this.useTags = useTags;
958    }
959
960    public void setNoOfTags(int noOfTags) {
961      this.noOfTags = noOfTags;
962    }
963
964    public void setReportLatency(boolean reportLatency) {
965      this.reportLatency = reportLatency;
966    }
967
968    public void setMultiGet(int multiGet) {
969      this.multiGet = multiGet;
970    }
971
972    public void setMultiPut(int multiPut) {
973      this.multiPut = multiPut;
974    }
975
976    public void setInMemoryCF(boolean inMemoryCF) {
977      this.inMemoryCF = inMemoryCF;
978    }
979
980    public void setPresplitRegions(int presplitRegions) {
981      this.presplitRegions = presplitRegions;
982    }
983
984    public void setCompression(Compression.Algorithm compression) {
985      this.compression = compression;
986    }
987
988    public void setEncryption(String encryption) {
989      this.encryption = encryption;
990    }
991
992    public void setBloomType(BloomType bloomType) {
993      this.bloomType = bloomType;
994    }
995
996    public void setBlockSize(int blockSize) {
997      this.blockSize = blockSize;
998    }
999
1000    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
1001      this.blockEncoding = blockEncoding;
1002    }
1003
1004    public void setValueRandom(boolean valueRandom) {
1005      this.valueRandom = valueRandom;
1006    }
1007
1008    public void setValueSize(int valueSize) {
1009      this.valueSize = valueSize;
1010    }
1011
1012    public void setBufferSize(long bufferSize) {
1013      this.bufferSize = bufferSize;
1014    }
1015
1016    public void setPeriod(int period) {
1017      this.period = period;
1018    }
1019
1020    public boolean isNomapred() {
1021      return nomapred;
1022    }
1023
1024    public boolean isFilterAll() {
1025      return filterAll;
1026    }
1027
1028    public long getStartRow() {
1029      return startRow;
1030    }
1031
1032    public float getSize() {
1033      return size;
1034    }
1035
1036    public long getPerClientRunRows() {
1037      return perClientRunRows;
1038    }
1039
1040    public int getNumClientThreads() {
1041      return numClientThreads;
1042    }
1043
1044    public long getTotalRows() {
1045      return totalRows;
1046    }
1047
1048    public float getSampleRate() {
1049      return sampleRate;
1050    }
1051
1052    public double getTraceRate() {
1053      return traceRate;
1054    }
1055
1056    public String getTableName() {
1057      return tableName;
1058    }
1059
1060    public boolean isFlushCommits() {
1061      return flushCommits;
1062    }
1063
1064    public boolean isWriteToWAL() {
1065      return writeToWAL;
1066    }
1067
1068    public boolean isAutoFlush() {
1069      return autoFlush;
1070    }
1071
1072    public boolean isUseTags() {
1073      return useTags;
1074    }
1075
1076    public int getNoOfTags() {
1077      return noOfTags;
1078    }
1079
1080    public boolean isReportLatency() {
1081      return reportLatency;
1082    }
1083
1084    public int getMultiGet() {
1085      return multiGet;
1086    }
1087
1088    public int getMultiPut() {
1089      return multiPut;
1090    }
1091
1092    public boolean isInMemoryCF() {
1093      return inMemoryCF;
1094    }
1095
1096    public int getPresplitRegions() {
1097      return presplitRegions;
1098    }
1099
1100    public Compression.Algorithm getCompression() {
1101      return compression;
1102    }
1103
1104    public String getEncryption() {
1105      return encryption;
1106    }
1107
1108    public DataBlockEncoding getBlockEncoding() {
1109      return blockEncoding;
1110    }
1111
1112    public boolean isValueRandom() {
1113      return valueRandom;
1114    }
1115
1116    public int getValueSize() {
1117      return valueSize;
1118    }
1119
1120    public long getPeriod() {
1121      return period;
1122    }
1123
1124    public BloomType getBloomType() {
1125      return bloomType;
1126    }
1127
1128    public int getBlockSize() {
1129      return blockSize;
1130    }
1131
1132    public boolean isOneCon() {
1133      return oneCon;
1134    }
1135
1136    public int getMeasureAfter() {
1137      return measureAfter;
1138    }
1139
1140    public void setMeasureAfter(int measureAfter) {
1141      this.measureAfter = measureAfter;
1142    }
1143
1144    public boolean getAddColumns() {
1145      return addColumns;
1146    }
1147
1148    public void setAddColumns(boolean addColumns) {
1149      this.addColumns = addColumns;
1150    }
1151
1152    public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1153      this.inMemoryCompaction = inMemoryCompaction;
1154    }
1155
1156    public MemoryCompactionPolicy getInMemoryCompaction() {
1157      return this.inMemoryCompaction;
1158    }
1159
1160    public long getBufferSize() {
1161      return this.bufferSize;
1162    }
1163  }
1164
1165  /*
1166   * A test. Subclass to particularize what happens per row.
1167   */
1168  static abstract class TestBase {
1169    private final long everyN;
1170
1171    protected final Configuration conf;
1172    protected final TestOptions opts;
1173
1174    protected final Status status;
1175
1176    private String testName;
1177    protected Histogram latencyHistogram;
1178    private Histogram replicaLatencyHistogram;
1179    private Histogram valueSizeHistogram;
1180    private Histogram rpcCallsHistogram;
1181    private Histogram remoteRpcCallsHistogram;
1182    private Histogram millisBetweenNextHistogram;
1183    private Histogram regionsScannedHistogram;
1184    private Histogram bytesInResultsHistogram;
1185    private Histogram bytesInRemoteResultsHistogram;
1186    private RandomDistribution.Zipf zipf;
1187    private long numOfReplyOverLatencyThreshold = 0;
1188    private long numOfReplyFromReplica = 0;
1189
1190    /**
1191     * Note that all subclasses of this class must provide a public constructor that has the exact
1192     * same list of arguments.
1193     */
1194    TestBase(final Configuration conf, final TestOptions options, final Status status) {
1195      this.conf = conf;
1196      this.opts = options;
1197      this.status = status;
1198      this.testName = this.getClass().getSimpleName();
1199      everyN = (long) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1200      if (options.isValueZipf()) {
1201        this.zipf =
1202          new RandomDistribution.Zipf(ThreadLocalRandom.current(), 1, options.getValueSize(), 1.2);
1203      }
1204      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1205    }
1206
1207    int getValueLength() {
1208      if (this.opts.isValueRandom()) {
1209        return ThreadLocalRandom.current().nextInt(opts.valueSize);
1210      } else if (this.opts.isValueZipf()) {
1211        return Math.abs(this.zipf.nextInt());
1212      } else {
1213        return opts.valueSize;
1214      }
1215    }
1216
1217    void updateValueSize(final Result[] rs) throws IOException {
1218      updateValueSize(rs, 0);
1219    }
1220
1221    void updateValueSize(final Result[] rs, final long latency) throws IOException {
1222      if (rs == null || (latency == 0)) return;
1223      for (Result r : rs)
1224        updateValueSize(r, latency);
1225    }
1226
1227    void updateValueSize(final Result r) throws IOException {
1228      updateValueSize(r, 0);
1229    }
1230
1231    void updateValueSize(final Result r, final long latency) throws IOException {
1232      if (r == null || (latency == 0)) return;
1233      int size = 0;
1234      // update replicaHistogram
1235      if (r.isStale()) {
1236        replicaLatencyHistogram.update(latency / 1000);
1237        numOfReplyFromReplica++;
1238      }
1239      if (!isRandomValueSize()) return;
1240
1241      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1242        size += scanner.current().getValueLength();
1243      }
1244      updateValueSize(size);
1245    }
1246
1247    void updateValueSize(final int valueSize) {
1248      if (!isRandomValueSize()) return;
1249      this.valueSizeHistogram.update(valueSize);
1250    }
1251
1252    void updateScanMetrics(final ScanMetrics metrics) {
1253      if (metrics == null) return;
1254      Map<String, Long> metricsMap = metrics.getMetricsMap();
1255      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1256      if (rpcCalls != null) {
1257        this.rpcCallsHistogram.update(rpcCalls.longValue());
1258      }
1259      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1260      if (remoteRpcCalls != null) {
1261        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1262      }
1263      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1264      if (millisBetweenNext != null) {
1265        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1266      }
1267      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1268      if (regionsScanned != null) {
1269        this.regionsScannedHistogram.update(regionsScanned.longValue());
1270      }
1271      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1272      if (bytesInResults != null && bytesInResults.longValue() > 0) {
1273        this.bytesInResultsHistogram.update(bytesInResults.longValue());
1274      }
1275      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1276      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1277        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1278      }
1279    }
1280
1281    String generateStatus(final long sr, final long i, final long lr) {
1282      return "row [start=" + sr + ", current=" + i + ", last=" + lr + "], latency ["
1283        + getShortLatencyReport() + "]"
1284        + (!isRandomValueSize() ? "" : ", value size [" + getShortValueSizeReport() + "]");
1285    }
1286
1287    boolean isRandomValueSize() {
1288      return opts.valueRandom;
1289    }
1290
1291    protected long getReportingPeriod() {
1292      return opts.period;
1293    }
1294
1295    /**
1296     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1297     */
1298    public Histogram getLatencyHistogram() {
1299      return latencyHistogram;
1300    }
1301
1302    void testSetup() throws IOException {
1303      // test metrics
1304      latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1305      // If it is a replica test, set up histogram for replica.
1306      if (opts.replicas > 1) {
1307        replicaLatencyHistogram =
1308          YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1309      }
1310      valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1311      // scan metrics
1312      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1313      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1314      millisBetweenNextHistogram =
1315        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1316      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1317      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1318      bytesInRemoteResultsHistogram =
1319        YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1320
1321      onStartup();
1322    }
1323
1324    abstract void onStartup() throws IOException;
1325
1326    void testTakedown() throws IOException {
1327      onTakedown();
1328      // Print all stats for this thread continuously.
1329      // Synchronize on Test.class so different threads don't intermingle the
1330      // output. We can't use 'this' here because each thread has its own instance of Test class.
1331      synchronized (Test.class) {
1332        status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1333        status
1334          .setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(latencyHistogram));
1335        if (opts.replicas > 1) {
1336          status.setStatus("Latency (us) from Replica Regions: "
1337            + YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
1338        }
1339        status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1340        status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1341        if (valueSizeHistogram.getCount() > 0) {
1342          status.setStatus(
1343            "ValueSize (bytes) : " + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1344          status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1345          status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1346        } else {
1347          status.setStatus("No valueSize statistics available");
1348        }
1349        if (rpcCallsHistogram.getCount() > 0) {
1350          status.setStatus(
1351            "rpcCalls (count): " + YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1352        }
1353        if (remoteRpcCallsHistogram.getCount() > 0) {
1354          status.setStatus("remoteRpcCalls (count): "
1355            + YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1356        }
1357        if (millisBetweenNextHistogram.getCount() > 0) {
1358          status.setStatus("millisBetweenNext (latency): "
1359            + YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1360        }
1361        if (regionsScannedHistogram.getCount() > 0) {
1362          status.setStatus("regionsScanned (count): "
1363            + YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1364        }
1365        if (bytesInResultsHistogram.getCount() > 0) {
1366          status.setStatus("bytesInResults (size): "
1367            + YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1368        }
1369        if (bytesInRemoteResultsHistogram.getCount() > 0) {
1370          status.setStatus("bytesInRemoteResults (size): "
1371            + YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1372        }
1373      }
1374    }
1375
1376    abstract void onTakedown() throws IOException;
1377
1378    /*
1379     * Run test
1380     * @return Elapsed time.
1381     */
1382    long test() throws IOException, InterruptedException {
1383      testSetup();
1384      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1385      final long startTime = System.nanoTime();
1386      try {
1387        testTimed();
1388      } finally {
1389        testTakedown();
1390      }
1391      return (System.nanoTime() - startTime) / 1000000;
1392    }
1393
1394    long getStartRow() {
1395      return opts.startRow;
1396    }
1397
1398    long getLastRow() {
1399      return getStartRow() + opts.perClientRunRows;
1400    }
1401
1402    /**
1403     * Provides an extension point for tests that don't want a per row invocation.
1404     */
1405    void testTimed() throws IOException, InterruptedException {
1406      long startRow = getStartRow();
1407      long lastRow = getLastRow();
1408      // Report on completion of 1/10th of total.
1409      for (int ii = 0; ii < opts.cycles; ii++) {
1410        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1411        for (long i = startRow; i < lastRow; i++) {
1412          if (i % everyN != 0) continue;
1413          long startTime = System.nanoTime();
1414          boolean requestSent = false;
1415          Span span = TraceUtil.getGlobalTracer().spanBuilder("test row").startSpan();
1416          try (Scope scope = span.makeCurrent()) {
1417            requestSent = testRow(i, startTime);
1418          } finally {
1419            span.end();
1420          }
1421          if ((i - startRow) > opts.measureAfter) {
1422            // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1423            // first 9 times and sends the actual get request in the 10th iteration.
1424            // We should only set latency when actual request is sent because otherwise
1425            // it turns out to be 0.
1426            if (requestSent) {
1427              long latency = (System.nanoTime() - startTime) / 1000;
1428              latencyHistogram.update(latency);
1429              if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
1430                numOfReplyOverLatencyThreshold++;
1431              }
1432            }
1433            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1434              status.setStatus(generateStatus(startRow, i, lastRow));
1435            }
1436          }
1437        }
1438      }
1439    }
1440
1441    /** Returns Subset of the histograms' calculation. */
1442    public String getShortLatencyReport() {
1443      return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1444    }
1445
1446    /** Returns Subset of the histograms' calculation. */
1447    public String getShortValueSizeReport() {
1448      return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1449    }
1450
1451    /**
1452     * Test for individual row.
1453     * @param i Row index.
1454     * @return true if the row was sent to server and need to record metrics. False if not, multiGet
1455     *         and multiPut e.g., the rows are sent to server only if enough gets/puts are gathered.
1456     */
1457    abstract boolean testRow(final long i, final long startTime)
1458      throws IOException, InterruptedException;
1459  }
1460
1461  static abstract class Test extends TestBase {
1462    protected Connection connection;
1463
1464    Test(final Connection con, final TestOptions options, final Status status) {
1465      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1466      this.connection = con;
1467    }
1468  }
1469
1470  static abstract class AsyncTest extends TestBase {
1471    protected AsyncConnection connection;
1472
1473    AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1474      super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1475      this.connection = con;
1476    }
1477  }
1478
1479  static abstract class TableTest extends Test {
1480    protected Table table;
1481
1482    TableTest(Connection con, TestOptions options, Status status) {
1483      super(con, options, status);
1484    }
1485
1486    @Override
1487    void onStartup() throws IOException {
1488      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1489    }
1490
1491    @Override
1492    void onTakedown() throws IOException {
1493      table.close();
1494    }
1495  }
1496
1497  /*
1498   * Parent class for all meta tests: MetaWriteTest, MetaRandomReadTest and CleanMetaTest
1499   */
1500  static abstract class MetaTest extends TableTest {
1501    protected int keyLength;
1502
1503    MetaTest(Connection con, TestOptions options, Status status) {
1504      super(con, options, status);
1505      keyLength = Long.toString(opts.perClientRunRows).length();
1506    }
1507
1508    @Override
1509    void onTakedown() throws IOException {
1510      // No clean up
1511    }
1512
1513    /*
1514     * Generates Lexicographically ascending strings
1515     */
1516    protected byte[] getSplitKey(final long i) {
1517      return Bytes.toBytes(String.format("%0" + keyLength + "d", i));
1518    }
1519
1520  }
1521
1522  static abstract class AsyncTableTest extends AsyncTest {
1523    protected AsyncTable<?> table;
1524
1525    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1526      super(con, options, status);
1527    }
1528
1529    @Override
1530    void onStartup() throws IOException {
1531      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1532    }
1533
1534    @Override
1535    void onTakedown() throws IOException {
1536    }
1537  }
1538
1539  static class AsyncRandomReadTest extends AsyncTableTest {
1540    private final Consistency consistency;
1541    private ArrayList<Get> gets;
1542
1543    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1544      super(con, options, status);
1545      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1546      if (opts.multiGet > 0) {
1547        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1548        this.gets = new ArrayList<>(opts.multiGet);
1549      }
1550    }
1551
1552    @Override
1553    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1554      if (opts.randomSleep > 0) {
1555        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1556      }
1557      Get get = new Get(getRandomRow(opts.totalRows));
1558      for (int family = 0; family < opts.families; family++) {
1559        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1560        if (opts.addColumns) {
1561          for (int column = 0; column < opts.columns; column++) {
1562            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1563            get.addColumn(familyName, qualifier);
1564          }
1565        } else {
1566          get.addFamily(familyName);
1567        }
1568      }
1569      if (opts.filterAll) {
1570        get.setFilter(new FilterAllFilter());
1571      }
1572      get.setConsistency(consistency);
1573      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1574      try {
1575        if (opts.multiGet > 0) {
1576          this.gets.add(get);
1577          if (this.gets.size() == opts.multiGet) {
1578            Result[] rs =
1579              this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1580            updateValueSize(rs);
1581            this.gets.clear();
1582          } else {
1583            return false;
1584          }
1585        } else {
1586          updateValueSize(this.table.get(get).get());
1587        }
1588      } catch (ExecutionException e) {
1589        throw new IOException(e);
1590      }
1591      return true;
1592    }
1593
1594    public static RuntimeException runtime(Throwable e) {
1595      if (e instanceof RuntimeException) {
1596        return (RuntimeException) e;
1597      }
1598      return new RuntimeException(e);
1599    }
1600
1601    public static <V> V propagate(Callable<V> callable) {
1602      try {
1603        return callable.call();
1604      } catch (Exception e) {
1605        throw runtime(e);
1606      }
1607    }
1608
1609    @Override
1610    protected long getReportingPeriod() {
1611      long period = opts.perClientRunRows / 10;
1612      return period == 0 ? opts.perClientRunRows : period;
1613    }
1614
1615    @Override
1616    protected void testTakedown() throws IOException {
1617      if (this.gets != null && this.gets.size() > 0) {
1618        this.table.get(gets);
1619        this.gets.clear();
1620      }
1621      super.testTakedown();
1622    }
1623  }
1624
1625  static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1626
1627    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1628      super(con, options, status);
1629    }
1630
1631    @Override
1632    protected byte[] generateRow(final long i) {
1633      return getRandomRow(opts.totalRows);
1634    }
1635  }
1636
1637  static class AsyncScanTest extends AsyncTableTest {
1638    private ResultScanner testScanner;
1639    private AsyncTable<?> asyncTable;
1640
1641    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1642      super(con, options, status);
1643    }
1644
1645    @Override
1646    void onStartup() throws IOException {
1647      this.asyncTable = connection.getTable(TableName.valueOf(opts.tableName),
1648        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1649    }
1650
1651    @Override
1652    void testTakedown() throws IOException {
1653      if (this.testScanner != null) {
1654        updateScanMetrics(this.testScanner.getScanMetrics());
1655        this.testScanner.close();
1656      }
1657      super.testTakedown();
1658    }
1659
1660    @Override
1661    boolean testRow(final long i, final long startTime) throws IOException {
1662      if (this.testScanner == null) {
1663        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1664          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1665          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1666        for (int family = 0; family < opts.families; family++) {
1667          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1668          if (opts.addColumns) {
1669            for (int column = 0; column < opts.columns; column++) {
1670              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1671              scan.addColumn(familyName, qualifier);
1672            }
1673          } else {
1674            scan.addFamily(familyName);
1675          }
1676        }
1677        if (opts.filterAll) {
1678          scan.setFilter(new FilterAllFilter());
1679        }
1680        this.testScanner = asyncTable.getScanner(scan);
1681      }
1682      Result r = testScanner.next();
1683      updateValueSize(r);
1684      return true;
1685    }
1686  }
1687
1688  static class AsyncSequentialReadTest extends AsyncTableTest {
1689    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1690      super(con, options, status);
1691    }
1692
1693    @Override
1694    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1695      Get get = new Get(format(i));
1696      for (int family = 0; family < opts.families; family++) {
1697        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1698        if (opts.addColumns) {
1699          for (int column = 0; column < opts.columns; column++) {
1700            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1701            get.addColumn(familyName, qualifier);
1702          }
1703        } else {
1704          get.addFamily(familyName);
1705        }
1706      }
1707      if (opts.filterAll) {
1708        get.setFilter(new FilterAllFilter());
1709      }
1710      try {
1711        updateValueSize(table.get(get).get());
1712      } catch (ExecutionException e) {
1713        throw new IOException(e);
1714      }
1715      return true;
1716    }
1717  }
1718
1719  static class AsyncSequentialWriteTest extends AsyncTableTest {
1720    private ArrayList<Put> puts;
1721
1722    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1723      super(con, options, status);
1724      if (opts.multiPut > 0) {
1725        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1726        this.puts = new ArrayList<>(opts.multiPut);
1727      }
1728    }
1729
1730    protected byte[] generateRow(final long i) {
1731      return format(i);
1732    }
1733
1734    @Override
1735    @SuppressWarnings("ReturnValueIgnored")
1736    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1737      byte[] row = generateRow(i);
1738      Put put = new Put(row);
1739      for (int family = 0; family < opts.families; family++) {
1740        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1741        for (int column = 0; column < opts.columns; column++) {
1742          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1743          byte[] value = generateData(getValueLength());
1744          if (opts.useTags) {
1745            byte[] tag = generateData(TAG_LENGTH);
1746            Tag[] tags = new Tag[opts.noOfTags];
1747            for (int n = 0; n < opts.noOfTags; n++) {
1748              Tag t = new ArrayBackedTag((byte) n, tag);
1749              tags[n] = t;
1750            }
1751            KeyValue kv =
1752              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
1753            put.add(kv);
1754            updateValueSize(kv.getValueLength());
1755          } else {
1756            put.addColumn(familyName, qualifier, value);
1757            updateValueSize(value.length);
1758          }
1759        }
1760      }
1761      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1762      try {
1763        table.put(put).get();
1764        if (opts.multiPut > 0) {
1765          this.puts.add(put);
1766          if (this.puts.size() == opts.multiPut) {
1767            this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1768            this.puts.clear();
1769          } else {
1770            return false;
1771          }
1772        } else {
1773          table.put(put).get();
1774        }
1775      } catch (ExecutionException e) {
1776        throw new IOException(e);
1777      }
1778      return true;
1779    }
1780  }
1781
1782  static abstract class BufferedMutatorTest extends Test {
1783    protected BufferedMutator mutator;
1784    protected Table table;
1785
1786    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1787      super(con, options, status);
1788    }
1789
1790    @Override
1791    void onStartup() throws IOException {
1792      BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1793      p.writeBufferSize(opts.bufferSize);
1794      this.mutator = connection.getBufferedMutator(p);
1795      this.table = connection.getTable(TableName.valueOf(opts.tableName));
1796    }
1797
1798    @Override
1799    void onTakedown() throws IOException {
1800      mutator.close();
1801      table.close();
1802    }
1803  }
1804
1805  static class RandomSeekScanTest extends TableTest {
1806    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1807      super(con, options, status);
1808    }
1809
1810    @Override
1811    boolean testRow(final long i, final long startTime) throws IOException {
1812      Scan scan = new Scan().withStartRow(getRandomRow(opts.totalRows)).setCaching(opts.caching)
1813        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1814        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1815      FilterList list = new FilterList();
1816      for (int family = 0; family < opts.families; family++) {
1817        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1818        if (opts.addColumns) {
1819          for (int column = 0; column < opts.columns; column++) {
1820            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1821            scan.addColumn(familyName, qualifier);
1822          }
1823        } else {
1824          scan.addFamily(familyName);
1825        }
1826      }
1827      if (opts.filterAll) {
1828        list.addFilter(new FilterAllFilter());
1829      }
1830      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1831      scan.setFilter(list);
1832      ResultScanner s = this.table.getScanner(scan);
1833      try {
1834        for (Result rr; (rr = s.next()) != null;) {
1835          updateValueSize(rr);
1836        }
1837      } finally {
1838        updateScanMetrics(s.getScanMetrics());
1839        s.close();
1840      }
1841      return true;
1842    }
1843
1844    @Override
1845    protected long getReportingPeriod() {
1846      long period = opts.perClientRunRows / 100;
1847      return period == 0 ? opts.perClientRunRows : period;
1848    }
1849
1850  }
1851
1852  static abstract class RandomScanWithRangeTest extends TableTest {
1853    RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1854      super(con, options, status);
1855    }
1856
1857    @Override
1858    boolean testRow(final long i, final long startTime) throws IOException {
1859      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1860      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1861        .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1862        .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1863        .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1864      for (int family = 0; family < opts.families; family++) {
1865        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1866        if (opts.addColumns) {
1867          for (int column = 0; column < opts.columns; column++) {
1868            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1869            scan.addColumn(familyName, qualifier);
1870          }
1871        } else {
1872          scan.addFamily(familyName);
1873        }
1874      }
1875      if (opts.filterAll) {
1876        scan.setFilter(new FilterAllFilter());
1877      }
1878      Result r = null;
1879      int count = 0;
1880      ResultScanner s = this.table.getScanner(scan);
1881      try {
1882        for (; (r = s.next()) != null;) {
1883          updateValueSize(r);
1884          count++;
1885        }
1886        if (i % 100 == 0) {
1887          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1888            Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()),
1889            count));
1890        }
1891      } finally {
1892        updateScanMetrics(s.getScanMetrics());
1893        s.close();
1894      }
1895      return true;
1896    }
1897
1898    protected abstract Pair<byte[], byte[]> getStartAndStopRow();
1899
1900    protected Pair<byte[], byte[]> generateStartAndStopRows(long maxRange) {
1901      long start = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % opts.totalRows;
1902      long stop = start + maxRange;
1903      return new Pair<>(format(start), format(stop));
1904    }
1905
1906    @Override
1907    protected long getReportingPeriod() {
1908      long period = opts.perClientRunRows / 100;
1909      return period == 0 ? opts.perClientRunRows : period;
1910    }
1911  }
1912
1913  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1914    RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1915      super(con, options, status);
1916    }
1917
1918    @Override
1919    protected Pair<byte[], byte[]> getStartAndStopRow() {
1920      return generateStartAndStopRows(10);
1921    }
1922  }
1923
1924  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1925    RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1926      super(con, options, status);
1927    }
1928
1929    @Override
1930    protected Pair<byte[], byte[]> getStartAndStopRow() {
1931      return generateStartAndStopRows(100);
1932    }
1933  }
1934
1935  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1936    RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1937      super(con, options, status);
1938    }
1939
1940    @Override
1941    protected Pair<byte[], byte[]> getStartAndStopRow() {
1942      return generateStartAndStopRows(1000);
1943    }
1944  }
1945
1946  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1947    RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1948      super(con, options, status);
1949    }
1950
1951    @Override
1952    protected Pair<byte[], byte[]> getStartAndStopRow() {
1953      return generateStartAndStopRows(10000);
1954    }
1955  }
1956
1957  static class RandomReadTest extends TableTest {
1958    private final Consistency consistency;
1959    private ArrayList<Get> gets;
1960
1961    RandomReadTest(Connection con, TestOptions options, Status status) {
1962      super(con, options, status);
1963      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1964      if (opts.multiGet > 0) {
1965        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1966        this.gets = new ArrayList<>(opts.multiGet);
1967      }
1968    }
1969
1970    @Override
1971    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
1972      if (opts.randomSleep > 0) {
1973        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
1974      }
1975      Get get = new Get(getRandomRow(opts.totalRows));
1976      for (int family = 0; family < opts.families; family++) {
1977        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1978        if (opts.addColumns) {
1979          for (int column = 0; column < opts.columns; column++) {
1980            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
1981            get.addColumn(familyName, qualifier);
1982          }
1983        } else {
1984          get.addFamily(familyName);
1985        }
1986      }
1987      if (opts.filterAll) {
1988        get.setFilter(new FilterAllFilter());
1989      }
1990      get.setConsistency(consistency);
1991      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1992      if (opts.multiGet > 0) {
1993        this.gets.add(get);
1994        if (this.gets.size() == opts.multiGet) {
1995          Result[] rs = this.table.get(this.gets);
1996          if (opts.replicas > 1) {
1997            long latency = System.nanoTime() - startTime;
1998            updateValueSize(rs, latency);
1999          } else {
2000            updateValueSize(rs);
2001          }
2002          this.gets.clear();
2003        } else {
2004          return false;
2005        }
2006      } else {
2007        if (opts.replicas > 1) {
2008          Result r = this.table.get(get);
2009          long latency = System.nanoTime() - startTime;
2010          updateValueSize(r, latency);
2011        } else {
2012          updateValueSize(this.table.get(get));
2013        }
2014      }
2015      return true;
2016    }
2017
2018    @Override
2019    protected long getReportingPeriod() {
2020      long period = opts.perClientRunRows / 10;
2021      return period == 0 ? opts.perClientRunRows : period;
2022    }
2023
2024    @Override
2025    protected void testTakedown() throws IOException {
2026      if (this.gets != null && this.gets.size() > 0) {
2027        this.table.get(gets);
2028        this.gets.clear();
2029      }
2030      super.testTakedown();
2031    }
2032  }
2033
2034  /*
2035   * Send random reads against fake regions inserted by MetaWriteTest
2036   */
2037  static class MetaRandomReadTest extends MetaTest {
2038    private RegionLocator regionLocator;
2039
2040    MetaRandomReadTest(Connection con, TestOptions options, Status status) {
2041      super(con, options, status);
2042      LOG.info("call getRegionLocation");
2043    }
2044
2045    @Override
2046    void onStartup() throws IOException {
2047      super.onStartup();
2048      this.regionLocator = connection.getRegionLocator(table.getName());
2049    }
2050
2051    @Override
2052    boolean testRow(final long i, final long startTime) throws IOException, InterruptedException {
2053      if (opts.randomSleep > 0) {
2054        Thread.sleep(ThreadLocalRandom.current().nextInt(opts.randomSleep));
2055      }
2056      HRegionLocation hRegionLocation = regionLocator.getRegionLocation(
2057        getSplitKey(ThreadLocalRandom.current().nextLong(opts.perClientRunRows)), true);
2058      LOG.debug("get location for region: " + hRegionLocation);
2059      return true;
2060    }
2061
2062    @Override
2063    protected long getReportingPeriod() {
2064      long period = opts.perClientRunRows / 10;
2065      return period == 0 ? opts.perClientRunRows : period;
2066    }
2067
2068    @Override
2069    protected void testTakedown() throws IOException {
2070      super.testTakedown();
2071    }
2072  }
2073
2074  static class RandomWriteTest extends SequentialWriteTest {
2075    RandomWriteTest(Connection con, TestOptions options, Status status) {
2076      super(con, options, status);
2077    }
2078
2079    @Override
2080    protected byte[] generateRow(final long i) {
2081      return getRandomRow(opts.totalRows);
2082    }
2083
2084  }
2085
2086  static class RandomDeleteTest extends SequentialDeleteTest {
2087    RandomDeleteTest(Connection con, TestOptions options, Status status) {
2088      super(con, options, status);
2089    }
2090
2091    @Override
2092    protected byte[] generateRow(final long i) {
2093      return getRandomRow(opts.totalRows);
2094    }
2095
2096  }
2097
2098  static class ScanTest extends TableTest {
2099    private ResultScanner testScanner;
2100
2101    ScanTest(Connection con, TestOptions options, Status status) {
2102      super(con, options, status);
2103    }
2104
2105    @Override
2106    void testTakedown() throws IOException {
2107      if (this.testScanner != null) {
2108        this.testScanner.close();
2109      }
2110      super.testTakedown();
2111    }
2112
2113    @Override
2114    boolean testRow(final long i, final long startTime) throws IOException {
2115      if (this.testScanner == null) {
2116        Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
2117          .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
2118          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
2119        for (int family = 0; family < opts.families; family++) {
2120          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2121          if (opts.addColumns) {
2122            for (int column = 0; column < opts.columns; column++) {
2123              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2124              scan.addColumn(familyName, qualifier);
2125            }
2126          } else {
2127            scan.addFamily(familyName);
2128          }
2129        }
2130        if (opts.filterAll) {
2131          scan.setFilter(new FilterAllFilter());
2132        }
2133        this.testScanner = table.getScanner(scan);
2134      }
2135      Result r = testScanner.next();
2136      updateValueSize(r);
2137      return true;
2138    }
2139  }
2140
2141  static class ReverseScanTest extends TableTest {
2142    private ResultScanner testScanner;
2143
2144    ReverseScanTest(Connection con, TestOptions options, Status status) {
2145      super(con, options, status);
2146    }
2147
2148    @Override
2149    void testTakedown() throws IOException {
2150      if (this.testScanner != null) {
2151        this.testScanner.close();
2152      }
2153      super.testTakedown();
2154    }
2155
2156    @Override
2157    boolean testRow(final long i, final long startTime) throws IOException {
2158      if (this.testScanner == null) {
2159        Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2160          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2161          .setScanMetricsEnabled(true).setReversed(true);
2162        for (int family = 0; family < opts.families; family++) {
2163          byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2164          if (opts.addColumns) {
2165            for (int column = 0; column < opts.columns; column++) {
2166              byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2167              scan.addColumn(familyName, qualifier);
2168            }
2169          } else {
2170            scan.addFamily(familyName);
2171          }
2172        }
2173        if (opts.filterAll) {
2174          scan.setFilter(new FilterAllFilter());
2175        }
2176        this.testScanner = table.getScanner(scan);
2177      }
2178      Result r = testScanner.next();
2179      updateValueSize(r);
2180      return true;
2181    }
2182  }
2183
2184  /**
2185   * Base class for operations that are CAS-like; that read a value and then set it based off what
2186   * they read. In this category is increment, append, checkAndPut, etc.
2187   * <p>
2188   * These operations also want some concurrency going on. Usually when these tests run, they
2189   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2190   * same key space. We do this with our getStartRow and getLastRow overrides.
2191   */
2192  static abstract class CASTableTest extends TableTest {
2193    private final byte[] qualifier;
2194
2195    CASTableTest(Connection con, TestOptions options, Status status) {
2196      super(con, options, status);
2197      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2198    }
2199
2200    byte[] getQualifier() {
2201      return this.qualifier;
2202    }
2203
2204    @Override
2205    long getStartRow() {
2206      return 0;
2207    }
2208
2209    @Override
2210    long getLastRow() {
2211      return opts.perClientRunRows;
2212    }
2213  }
2214
2215  static class IncrementTest extends CASTableTest {
2216    IncrementTest(Connection con, TestOptions options, Status status) {
2217      super(con, options, status);
2218    }
2219
2220    @Override
2221    boolean testRow(final long i, final long startTime) throws IOException {
2222      Increment increment = new Increment(format(i));
2223      // unlike checkAndXXX tests, which make most sense to do on a single value,
2224      // if multiple families are specified for an increment test we assume it is
2225      // meant to raise the work factor
2226      for (int family = 0; family < opts.families; family++) {
2227        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2228        increment.addColumn(familyName, getQualifier(), 1l);
2229      }
2230      updateValueSize(this.table.increment(increment));
2231      return true;
2232    }
2233  }
2234
2235  static class AppendTest extends CASTableTest {
2236    AppendTest(Connection con, TestOptions options, Status status) {
2237      super(con, options, status);
2238    }
2239
2240    @Override
2241    boolean testRow(final long i, final long startTime) throws IOException {
2242      byte[] bytes = format(i);
2243      Append append = new Append(bytes);
2244      // unlike checkAndXXX tests, which make most sense to do on a single value,
2245      // if multiple families are specified for an append test we assume it is
2246      // meant to raise the work factor
2247      for (int family = 0; family < opts.families; family++) {
2248        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2249        append.addColumn(familyName, getQualifier(), bytes);
2250      }
2251      updateValueSize(this.table.append(append));
2252      return true;
2253    }
2254  }
2255
2256  static class CheckAndMutateTest extends CASTableTest {
2257    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2258      super(con, options, status);
2259    }
2260
2261    @Override
2262    boolean testRow(final long i, final long startTime) throws IOException {
2263      final byte[] bytes = format(i);
2264      // checkAndXXX tests operate on only a single value
2265      // Put a known value so when we go to check it, it is there.
2266      Put put = new Put(bytes);
2267      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2268      this.table.put(put);
2269      RowMutations mutations = new RowMutations(bytes);
2270      mutations.add(put);
2271      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2272        .thenMutate(mutations);
2273      return true;
2274    }
2275  }
2276
2277  static class CheckAndPutTest extends CASTableTest {
2278    CheckAndPutTest(Connection con, TestOptions options, Status status) {
2279      super(con, options, status);
2280    }
2281
2282    @Override
2283    boolean testRow(final long i, final long startTime) throws IOException {
2284      final byte[] bytes = format(i);
2285      // checkAndXXX tests operate on only a single value
2286      // Put a known value so when we go to check it, it is there.
2287      Put put = new Put(bytes);
2288      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2289      this.table.put(put);
2290      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2291        .thenPut(put);
2292      return true;
2293    }
2294  }
2295
2296  static class CheckAndDeleteTest extends CASTableTest {
2297    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2298      super(con, options, status);
2299    }
2300
2301    @Override
2302    boolean testRow(final long i, final long startTime) throws IOException {
2303      final byte[] bytes = format(i);
2304      // checkAndXXX tests operate on only a single value
2305      // Put a known value so when we go to check it, it is there.
2306      Put put = new Put(bytes);
2307      put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2308      this.table.put(put);
2309      Delete delete = new Delete(put.getRow());
2310      delete.addColumn(FAMILY_ZERO, getQualifier());
2311      this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier()).ifEquals(bytes)
2312        .thenDelete(delete);
2313      return true;
2314    }
2315  }
2316
2317  /*
2318   * Delete all fake regions inserted to meta table by MetaWriteTest.
2319   */
2320  static class CleanMetaTest extends MetaTest {
2321    CleanMetaTest(Connection con, TestOptions options, Status status) {
2322      super(con, options, status);
2323    }
2324
2325    @Override
2326    boolean testRow(final long i, final long startTime) throws IOException {
2327      try {
2328        RegionInfo regionInfo = connection.getRegionLocator(table.getName())
2329          .getRegionLocation(getSplitKey(i), false).getRegion();
2330        LOG.debug("deleting region from meta: " + regionInfo);
2331
2332        Delete delete =
2333          MetaTableAccessor.makeDeleteFromRegionInfo(regionInfo, HConstants.LATEST_TIMESTAMP);
2334        try (Table t = MetaTableAccessor.getMetaHTable(connection)) {
2335          t.delete(delete);
2336        }
2337      } catch (IOException ie) {
2338        // Log and continue
2339        LOG.error("cannot find region with start key: " + i);
2340      }
2341      return true;
2342    }
2343  }
2344
2345  static class SequentialReadTest extends TableTest {
2346    SequentialReadTest(Connection con, TestOptions options, Status status) {
2347      super(con, options, status);
2348    }
2349
2350    @Override
2351    boolean testRow(final long i, final long startTime) throws IOException {
2352      Get get = new Get(format(i));
2353      for (int family = 0; family < opts.families; family++) {
2354        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2355        if (opts.addColumns) {
2356          for (int column = 0; column < opts.columns; column++) {
2357            byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2358            get.addColumn(familyName, qualifier);
2359          }
2360        } else {
2361          get.addFamily(familyName);
2362        }
2363      }
2364      if (opts.filterAll) {
2365        get.setFilter(new FilterAllFilter());
2366      }
2367      updateValueSize(table.get(get));
2368      return true;
2369    }
2370  }
2371
2372  static class SequentialWriteTest extends BufferedMutatorTest {
2373    private ArrayList<Put> puts;
2374
2375    SequentialWriteTest(Connection con, TestOptions options, Status status) {
2376      super(con, options, status);
2377      if (opts.multiPut > 0) {
2378        LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2379        this.puts = new ArrayList<>(opts.multiPut);
2380      }
2381    }
2382
2383    protected byte[] generateRow(final long i) {
2384      return format(i);
2385    }
2386
2387    @Override
2388    boolean testRow(final long i, final long startTime) throws IOException {
2389      byte[] row = generateRow(i);
2390      Put put = new Put(row);
2391      for (int family = 0; family < opts.families; family++) {
2392        byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2393        for (int column = 0; column < opts.columns; column++) {
2394          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2395          byte[] value = generateData(getValueLength());
2396          if (opts.useTags) {
2397            byte[] tag = generateData(TAG_LENGTH);
2398            Tag[] tags = new Tag[opts.noOfTags];
2399            for (int n = 0; n < opts.noOfTags; n++) {
2400              Tag t = new ArrayBackedTag((byte) n, tag);
2401              tags[n] = t;
2402            }
2403            KeyValue kv =
2404              new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP, value, tags);
2405            put.add(kv);
2406            updateValueSize(kv.getValueLength());
2407          } else {
2408            put.addColumn(familyName, qualifier, value);
2409            updateValueSize(value.length);
2410          }
2411        }
2412      }
2413      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2414      if (opts.autoFlush) {
2415        if (opts.multiPut > 0) {
2416          this.puts.add(put);
2417          if (this.puts.size() == opts.multiPut) {
2418            table.put(this.puts);
2419            this.puts.clear();
2420          } else {
2421            return false;
2422          }
2423        } else {
2424          table.put(put);
2425        }
2426      } else {
2427        mutator.mutate(put);
2428      }
2429      return true;
2430    }
2431  }
2432
2433  static class SequentialDeleteTest extends BufferedMutatorTest {
2434
2435    SequentialDeleteTest(Connection con, TestOptions options, Status status) {
2436      super(con, options, status);
2437    }
2438
2439    protected byte[] generateRow(final long i) {
2440      return format(i);
2441    }
2442
2443    @Override
2444    boolean testRow(final long i, final long startTime) throws IOException {
2445      byte[] row = generateRow(i);
2446      Delete delete = new Delete(row);
2447      for (int family = 0; family < opts.families; family++) {
2448        byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2449        delete.addFamily(familyName);
2450      }
2451      delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2452      if (opts.autoFlush) {
2453        table.delete(delete);
2454      } else {
2455        mutator.mutate(delete);
2456      }
2457      return true;
2458    }
2459  }
2460
2461  /*
2462   * Insert fake regions into meta table with contiguous split keys.
2463   */
2464  static class MetaWriteTest extends MetaTest {
2465
2466    MetaWriteTest(Connection con, TestOptions options, Status status) {
2467      super(con, options, status);
2468    }
2469
2470    @Override
2471    boolean testRow(final long i, final long startTime) throws IOException {
2472      List<RegionInfo> regionInfos = new ArrayList<RegionInfo>();
2473      RegionInfo regionInfo = (RegionInfoBuilder.newBuilder(TableName.valueOf(TABLE_NAME))
2474        .setStartKey(getSplitKey(i)).setEndKey(getSplitKey(i + 1)).build());
2475      regionInfos.add(regionInfo);
2476      MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 1);
2477
2478      // write the serverName columns
2479      MetaTableAccessor.updateRegionLocation(connection, regionInfo,
2480        ServerName.valueOf("localhost", 60010, ThreadLocalRandom.current().nextLong()), i,
2481        EnvironmentEdgeManager.currentTime());
2482      return true;
2483    }
2484  }
2485
2486  static class FilteredScanTest extends TableTest {
2487    protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2488
2489    FilteredScanTest(Connection con, TestOptions options, Status status) {
2490      super(con, options, status);
2491      if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2492        LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
2493          + ". This could take a very long time.");
2494      }
2495    }
2496
2497    @Override
2498    boolean testRow(long i, final long startTime) throws IOException {
2499      byte[] value = generateData(getValueLength());
2500      Scan scan = constructScan(value);
2501      ResultScanner scanner = null;
2502      try {
2503        scanner = this.table.getScanner(scan);
2504        for (Result r = null; (r = scanner.next()) != null;) {
2505          updateValueSize(r);
2506        }
2507      } finally {
2508        if (scanner != null) {
2509          updateScanMetrics(scanner.getScanMetrics());
2510          scanner.close();
2511        }
2512      }
2513      return true;
2514    }
2515
2516    protected Scan constructScan(byte[] valuePrefix) throws IOException {
2517      FilterList list = new FilterList();
2518      Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO, CompareOperator.EQUAL,
2519        new BinaryComparator(valuePrefix));
2520      list.addFilter(filter);
2521      if (opts.filterAll) {
2522        list.addFilter(new FilterAllFilter());
2523      }
2524      Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2525        .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2526        .setScanMetricsEnabled(true);
2527      if (opts.addColumns) {
2528        for (int column = 0; column < opts.columns; column++) {
2529          byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + column);
2530          scan.addColumn(FAMILY_ZERO, qualifier);
2531        }
2532      } else {
2533        scan.addFamily(FAMILY_ZERO);
2534      }
2535      scan.setFilter(list);
2536      return scan;
2537    }
2538  }
2539
2540  /**
2541   * Compute a throughput rate in MB/s.
2542   * @param rows   Number of records consumed.
2543   * @param timeMs Time taken in milliseconds.
2544   * @return String value with label, ie '123.76 MB/s'
2545   */
2546  private static String calculateMbps(long rows, long timeMs, final int valueSize, int families,
2547    int columns) {
2548    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH
2549      + ((valueSize + (FAMILY_NAME_BASE.length() + 1) + COLUMN_ZERO.length) * columns) * families);
2550    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2551      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT).divide(BYTES_PER_MB, CXT);
2552    return FMT.format(mbps) + " MB/s";
2553  }
2554
2555  /*
2556   * Format passed integer.
2557   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed number (Does
2558   * absolute in case number is negative).
2559   */
2560  public static byte[] format(final long number) {
2561    byte[] b = new byte[ROW_LENGTH];
2562    long d = Math.abs(number);
2563    for (int i = b.length - 1; i >= 0; i--) {
2564      b[i] = (byte) ((d % 10) + '0');
2565      d /= 10;
2566    }
2567    return b;
2568  }
2569
2570  /*
2571   * This method takes some time and is done inline uploading data. For example, doing the mapfile
2572   * test, generation of the key and value consumes about 30% of CPU time.
2573   * @return Generated random value to insert into a table cell.
2574   */
2575  public static byte[] generateData(int length) {
2576    byte[] b = new byte[length];
2577    int i;
2578
2579    Random r = ThreadLocalRandom.current();
2580    for (i = 0; i < (length - 8); i += 8) {
2581      b[i] = (byte) (65 + r.nextInt(26));
2582      b[i + 1] = b[i];
2583      b[i + 2] = b[i];
2584      b[i + 3] = b[i];
2585      b[i + 4] = b[i];
2586      b[i + 5] = b[i];
2587      b[i + 6] = b[i];
2588      b[i + 7] = b[i];
2589    }
2590
2591    byte a = (byte) (65 + r.nextInt(26));
2592    for (; i < length; i++) {
2593      b[i] = a;
2594    }
2595    return b;
2596  }
2597
2598  static byte[] getRandomRow(final long totalRows) {
2599    return format(generateRandomRow(totalRows));
2600  }
2601
2602  static long generateRandomRow(final long totalRows) {
2603    return ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % totalRows;
2604  }
2605
2606  static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2607    Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2608    throws IOException, InterruptedException {
2609    status.setStatus(
2610      "Start " + cmd + " at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows");
2611    long totalElapsedTime;
2612
2613    final TestBase t;
2614    try {
2615      if (AsyncTest.class.isAssignableFrom(cmd)) {
2616        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2617        Constructor<? extends AsyncTest> constructor =
2618          newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2619        t = constructor.newInstance(asyncCon, opts, status);
2620      } else {
2621        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2622        Constructor<? extends Test> constructor =
2623          newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2624        t = constructor.newInstance(con, opts, status);
2625      }
2626    } catch (NoSuchMethodException e) {
2627      throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2628        + ".  It does not provide a constructor as described by "
2629        + "the javadoc comment.  Available constructors are: "
2630        + Arrays.toString(cmd.getConstructors()));
2631    } catch (Exception e) {
2632      throw new IllegalStateException("Failed to construct command class", e);
2633    }
2634    totalElapsedTime = t.test();
2635
2636    status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + opts.startRow
2637      + " for " + opts.perClientRunRows + " rows" + " ("
2638      + calculateMbps((long) (opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2639        getAverageValueLength(opts), opts.families, opts.columns)
2640      + ")");
2641
2642    return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
2643      t.numOfReplyFromReplica, t.getLatencyHistogram());
2644  }
2645
2646  private static int getAverageValueLength(final TestOptions opts) {
2647    return opts.valueRandom ? opts.valueSize / 2 : opts.valueSize;
2648  }
2649
2650  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts)
2651    throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
2652    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2653    // the TestOptions introspection for us and dump the output in a readable format.
2654    LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2655    Admin admin = null;
2656    Connection connection = null;
2657    try {
2658      connection = ConnectionFactory.createConnection(getConf());
2659      admin = connection.getAdmin();
2660      checkTable(admin, opts);
2661    } finally {
2662      if (admin != null) admin.close();
2663      if (connection != null) connection.close();
2664    }
2665    if (opts.nomapred) {
2666      doLocalClients(opts, getConf());
2667    } else {
2668      doMapReduce(opts, getConf());
2669    }
2670  }
2671
2672  protected void printUsage() {
2673    printUsage(PE_COMMAND_SHORTNAME, null);
2674  }
2675
2676  protected static void printUsage(final String message) {
2677    printUsage(PE_COMMAND_SHORTNAME, message);
2678  }
2679
2680  protected static void printUsageAndExit(final String message, final int exitCode) {
2681    printUsage(message);
2682    System.exit(exitCode);
2683  }
2684
2685  protected static void printUsage(final String shortName, final String message) {
2686    if (message != null && message.length() > 0) {
2687      System.err.println(message);
2688    }
2689    System.err.print("Usage: hbase " + shortName);
2690    System.err.println("  <OPTIONS> [-D<property=value>]* <command|class> <nclients>");
2691    System.err.println();
2692    System.err.println("General Options:");
2693    System.err.println(
2694      " nomapred        Run multiple clients using threads " + "(rather than use mapreduce)");
2695    System.err
2696      .println(" oneCon          all the threads share the same connection. Default: False");
2697    System.err.println(" connCount          connections all threads share. "
2698      + "For example, if set to 2, then all thread share 2 connection. "
2699      + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2700      + "if not, connCount=thread number");
2701
2702    System.err.println(" sampleRate      Execute test on a sample of total "
2703      + "rows. Only supported by randomRead. Default: 1.0");
2704    System.err.println(" period          Report every 'period' rows: "
2705      + "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows() / 10);
2706    System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
2707    System.err.println(
2708      " traceRate       Enable HTrace spans. Initiate tracing every N rows. " + "Default: 0");
2709    System.err.println(" latency         Set to report operation latencies. Default: False");
2710    System.err.println(" latencyThreshold  Set to report number of operations with latency "
2711      + "over lantencyThreshold, unit in millisecond, default 0");
2712    System.err.println(" measureAfter    Start to measure the latency once 'measureAfter'"
2713      + " rows have been treated. Default: 0");
2714    System.err
2715      .println(" valueSize       Pass value size to use: Default: " + DEFAULT_OPTS.getValueSize());
2716    System.err.println(" valueRandom     Set if we should vary value size between 0 and "
2717      + "'valueSize'; set on read for stats on size: Default: Not set.");
2718    System.err.println(" blockEncoding   Block encoding to use. Value should be one of "
2719      + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2720    System.err.println();
2721    System.err.println("Table Creation / Write Tests:");
2722    System.err.println(" table           Alternate table name. Default: 'TestTable'");
2723    System.err.println(
2724      " rows            Rows each client runs. Default: " + DEFAULT_OPTS.getPerClientRunRows()
2725        + ".  In case of randomReads and randomSeekScans this could"
2726        + " be specified along with --size to specify the number of rows to be scanned within"
2727        + " the total range specified by the size.");
2728    System.err.println(
2729      " size            Total size in GiB. Mutually exclusive with --rows for writes and scans"
2730        + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2731        + " use size to specify the end range and --rows"
2732        + " specifies the number of rows within that range. " + "Default: 1.0.");
2733    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2734    System.err.println(" encryption      Encryption type to use (AES, ...). Default: 'NONE'");
2735    System.err.println(
2736      " flushCommits    Used to determine if the test should flush the table. " + "Default: false");
2737    System.err.println(" valueZipf       Set if we should vary value size between 0 and "
2738      + "'valueSize' in zipf form: Default: Not set.");
2739    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
2740    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
2741    System.err.println(" multiPut        Batch puts together into groups of N. Only supported "
2742      + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2743    System.err.println(" presplit        Create presplit table. If a table with same name exists,"
2744      + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2745      + "Recommended for accurate perf analysis (see guide). Default: disabled");
2746    System.err.println(
2747      " usetags         Writes tags along with KVs. Use with HFile V3. " + "Default: false");
2748    System.err.println(" numoftags       Specify the no of tags that would be needed. "
2749      + "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2750    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
2751    System.err.println(" columns         Columns to write per row. Default: 1");
2752    System.err
2753      .println(" families        Specify number of column families for the table. Default: 1");
2754    System.err.println();
2755    System.err.println("Read Tests:");
2756    System.err.println(" filterAll       Helps to filter out all the rows on the server side"
2757      + " there by not returning any thing back to the client.  Helps to check the server side"
2758      + " performance.  Uses FilterAllFilter internally. ");
2759    System.err.println(" multiGet        Batch gets together into groups of N. Only supported "
2760      + "by randomRead. Default: disabled");
2761    System.err.println(" inmemory        Tries to keep the HFiles of the CF "
2762      + "inmemory as far as possible. Not guaranteed that reads are always served "
2763      + "from memory.  Default: false");
2764    System.err
2765      .println(" bloomFilter     Bloom filter type, one of " + Arrays.toString(BloomType.values()));
2766    System.err.println(" blockSize       Blocksize to use when writing out hfiles. ");
2767    System.err
2768      .println(" inmemoryCompaction  Makes the column family to do inmemory flushes/compactions. "
2769        + "Uses the CompactingMemstore");
2770    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
2771    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
2772    System.err.println(
2773      " randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
2774    System.err.println(" caching         Scan caching to use. Default: 30");
2775    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
2776    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. Default: true");
2777    System.err.println(
2778      " scanReadType    Set the readType option for scan, stream/pread/default. Default: default");
2779    System.err.println(" bufferSize      Set the value of client side buffering. Default: 2MB");
2780    System.err.println();
2781    System.err.println(" Note: -D properties will be applied to the conf used. ");
2782    System.err.println("  For example: ");
2783    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
2784    System.err.println("   -Dmapreduce.task.timeout=60000");
2785    System.err.println();
2786    System.err.println("Command:");
2787    for (CmdDescriptor command : COMMANDS.values()) {
2788      System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2789    }
2790    System.err.println();
2791    System.err.println("Class:");
2792    System.err.println("To run any custom implementation of PerformanceEvaluation.Test, "
2793      + "provide the classname of the implementaion class in place of "
2794      + "command name and it will be loaded at runtime from classpath.:");
2795    System.err.println("Please consider to contribute back "
2796      + "this custom test impl into a builtin PE command for the benefit of the community");
2797    System.err.println();
2798    System.err.println("Args:");
2799    System.err.println(" nclients        Integer. Required. Total number of clients "
2800      + "(and HRegionServers) running. 1 <= value <= 500");
2801    System.err.println("Examples:");
2802    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2803    System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2804    System.err.println(" To run 10 clients doing increments over ten rows:");
2805    System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2806  }
2807
2808  /**
2809   * Parse options passed in via an arguments array. Assumes that array has been split on
2810   * white-space and placed into a {@code Queue}. Any unknown arguments will remain in the queue at
2811   * the conclusion of this method call. It's up to the caller to deal with these unrecognized
2812   * arguments.
2813   */
2814  static TestOptions parseOpts(Queue<String> args) {
2815    TestOptions opts = new TestOptions();
2816
2817    String cmd = null;
2818    while ((cmd = args.poll()) != null) {
2819      if (cmd.equals("-h") || cmd.startsWith("--h")) {
2820        // place item back onto queue so that caller knows parsing was incomplete
2821        args.add(cmd);
2822        break;
2823      }
2824
2825      final String nmr = "--nomapred";
2826      if (cmd.startsWith(nmr)) {
2827        opts.nomapred = true;
2828        continue;
2829      }
2830
2831      final String rows = "--rows=";
2832      if (cmd.startsWith(rows)) {
2833        opts.perClientRunRows = Long.parseLong(cmd.substring(rows.length()));
2834        continue;
2835      }
2836
2837      final String cycles = "--cycles=";
2838      if (cmd.startsWith(cycles)) {
2839        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2840        continue;
2841      }
2842
2843      final String sampleRate = "--sampleRate=";
2844      if (cmd.startsWith(sampleRate)) {
2845        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2846        continue;
2847      }
2848
2849      final String table = "--table=";
2850      if (cmd.startsWith(table)) {
2851        opts.tableName = cmd.substring(table.length());
2852        continue;
2853      }
2854
2855      final String startRow = "--startRow=";
2856      if (cmd.startsWith(startRow)) {
2857        opts.startRow = Long.parseLong(cmd.substring(startRow.length()));
2858        continue;
2859      }
2860
2861      final String compress = "--compress=";
2862      if (cmd.startsWith(compress)) {
2863        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2864        continue;
2865      }
2866
2867      final String encryption = "--encryption=";
2868      if (cmd.startsWith(encryption)) {
2869        opts.encryption = cmd.substring(encryption.length());
2870        continue;
2871      }
2872
2873      final String traceRate = "--traceRate=";
2874      if (cmd.startsWith(traceRate)) {
2875        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2876        continue;
2877      }
2878
2879      final String blockEncoding = "--blockEncoding=";
2880      if (cmd.startsWith(blockEncoding)) {
2881        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2882        continue;
2883      }
2884
2885      final String flushCommits = "--flushCommits=";
2886      if (cmd.startsWith(flushCommits)) {
2887        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2888        continue;
2889      }
2890
2891      final String writeToWAL = "--writeToWAL=";
2892      if (cmd.startsWith(writeToWAL)) {
2893        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2894        continue;
2895      }
2896
2897      final String presplit = "--presplit=";
2898      if (cmd.startsWith(presplit)) {
2899        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2900        continue;
2901      }
2902
2903      final String inMemory = "--inmemory=";
2904      if (cmd.startsWith(inMemory)) {
2905        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2906        continue;
2907      }
2908
2909      final String autoFlush = "--autoFlush=";
2910      if (cmd.startsWith(autoFlush)) {
2911        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2912        continue;
2913      }
2914
2915      final String onceCon = "--oneCon=";
2916      if (cmd.startsWith(onceCon)) {
2917        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2918        continue;
2919      }
2920
2921      final String connCount = "--connCount=";
2922      if (cmd.startsWith(connCount)) {
2923        opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2924        continue;
2925      }
2926
2927      final String latencyThreshold = "--latencyThreshold=";
2928      if (cmd.startsWith(latencyThreshold)) {
2929        opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
2930        continue;
2931      }
2932
2933      final String latency = "--latency";
2934      if (cmd.startsWith(latency)) {
2935        opts.reportLatency = true;
2936        continue;
2937      }
2938
2939      final String multiGet = "--multiGet=";
2940      if (cmd.startsWith(multiGet)) {
2941        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2942        continue;
2943      }
2944
2945      final String multiPut = "--multiPut=";
2946      if (cmd.startsWith(multiPut)) {
2947        opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2948        continue;
2949      }
2950
2951      final String useTags = "--usetags=";
2952      if (cmd.startsWith(useTags)) {
2953        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2954        continue;
2955      }
2956
2957      final String noOfTags = "--numoftags=";
2958      if (cmd.startsWith(noOfTags)) {
2959        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2960        continue;
2961      }
2962
2963      final String replicas = "--replicas=";
2964      if (cmd.startsWith(replicas)) {
2965        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2966        continue;
2967      }
2968
2969      final String filterOutAll = "--filterAll";
2970      if (cmd.startsWith(filterOutAll)) {
2971        opts.filterAll = true;
2972        continue;
2973      }
2974
2975      final String size = "--size=";
2976      if (cmd.startsWith(size)) {
2977        opts.size = Float.parseFloat(cmd.substring(size.length()));
2978        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2979        continue;
2980      }
2981
2982      final String splitPolicy = "--splitPolicy=";
2983      if (cmd.startsWith(splitPolicy)) {
2984        opts.splitPolicy = cmd.substring(splitPolicy.length());
2985        continue;
2986      }
2987
2988      final String randomSleep = "--randomSleep=";
2989      if (cmd.startsWith(randomSleep)) {
2990        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2991        continue;
2992      }
2993
2994      final String measureAfter = "--measureAfter=";
2995      if (cmd.startsWith(measureAfter)) {
2996        opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2997        continue;
2998      }
2999
3000      final String bloomFilter = "--bloomFilter=";
3001      if (cmd.startsWith(bloomFilter)) {
3002        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
3003        continue;
3004      }
3005
3006      final String blockSize = "--blockSize=";
3007      if (cmd.startsWith(blockSize)) {
3008        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
3009        continue;
3010      }
3011
3012      final String valueSize = "--valueSize=";
3013      if (cmd.startsWith(valueSize)) {
3014        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
3015        continue;
3016      }
3017
3018      final String valueRandom = "--valueRandom";
3019      if (cmd.startsWith(valueRandom)) {
3020        opts.valueRandom = true;
3021        continue;
3022      }
3023
3024      final String valueZipf = "--valueZipf";
3025      if (cmd.startsWith(valueZipf)) {
3026        opts.valueZipf = true;
3027        continue;
3028      }
3029
3030      final String period = "--period=";
3031      if (cmd.startsWith(period)) {
3032        opts.period = Integer.parseInt(cmd.substring(period.length()));
3033        continue;
3034      }
3035
3036      final String addColumns = "--addColumns=";
3037      if (cmd.startsWith(addColumns)) {
3038        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
3039        continue;
3040      }
3041
3042      final String inMemoryCompaction = "--inmemoryCompaction=";
3043      if (cmd.startsWith(inMemoryCompaction)) {
3044        opts.inMemoryCompaction =
3045          MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
3046        continue;
3047      }
3048
3049      final String columns = "--columns=";
3050      if (cmd.startsWith(columns)) {
3051        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
3052        continue;
3053      }
3054
3055      final String families = "--families=";
3056      if (cmd.startsWith(families)) {
3057        opts.families = Integer.parseInt(cmd.substring(families.length()));
3058        continue;
3059      }
3060
3061      final String caching = "--caching=";
3062      if (cmd.startsWith(caching)) {
3063        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
3064        continue;
3065      }
3066
3067      final String asyncPrefetch = "--asyncPrefetch";
3068      if (cmd.startsWith(asyncPrefetch)) {
3069        opts.asyncPrefetch = true;
3070        continue;
3071      }
3072
3073      final String cacheBlocks = "--cacheBlocks=";
3074      if (cmd.startsWith(cacheBlocks)) {
3075        opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
3076        continue;
3077      }
3078
3079      final String scanReadType = "--scanReadType=";
3080      if (cmd.startsWith(scanReadType)) {
3081        opts.scanReadType =
3082          Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
3083        continue;
3084      }
3085
3086      final String bufferSize = "--bufferSize=";
3087      if (cmd.startsWith(bufferSize)) {
3088        opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
3089        continue;
3090      }
3091
3092      final String commandPropertiesFile = "--commandPropertiesFile=";
3093      if (cmd.startsWith(commandPropertiesFile)) {
3094        String fileName = String.valueOf(cmd.substring(commandPropertiesFile.length()));
3095        Properties properties = new Properties();
3096        try {
3097          properties
3098            .load(PerformanceEvaluation.class.getClassLoader().getResourceAsStream(fileName));
3099          opts.commandProperties = properties;
3100        } catch (IOException e) {
3101          LOG.error("Failed to load metricIds from properties file", e);
3102        }
3103        continue;
3104      }
3105
3106      validateParsedOpts(opts);
3107
3108      if (isCommandClass(cmd)) {
3109        opts.cmdName = cmd;
3110        try {
3111          opts.numClientThreads = Integer.parseInt(args.remove());
3112        } catch (NoSuchElementException | NumberFormatException e) {
3113          throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
3114        }
3115        opts = calculateRowsAndSize(opts);
3116        break;
3117      } else {
3118        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
3119      }
3120
3121      // Not matching any option or command.
3122      System.err.println("Error: Wrong option or command: " + cmd);
3123      args.add(cmd);
3124      break;
3125    }
3126    return opts;
3127  }
3128
3129  /**
3130   * Validates opts after all the opts are parsed, so that caller need not to maintain order of opts
3131   */
3132  private static void validateParsedOpts(TestOptions opts) {
3133
3134    if (!opts.autoFlush && opts.multiPut > 0) {
3135      throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
3136    }
3137
3138    if (opts.oneCon && opts.connCount > 1) {
3139      throw new IllegalArgumentException(
3140        "oneCon is set to true, " + "connCount should not bigger than 1");
3141    }
3142
3143    if (opts.valueZipf && opts.valueRandom) {
3144      throw new IllegalStateException("Either valueZipf or valueRandom but not both");
3145    }
3146  }
3147
3148  static TestOptions calculateRowsAndSize(final TestOptions opts) {
3149    int rowsPerGB = getRowsPerGB(opts);
3150    if (
3151      (opts.getCmdName() != null
3152        && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
3153        && opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows
3154    ) {
3155      opts.totalRows = (long) (opts.size * rowsPerGB);
3156    } else if (opts.size != DEFAULT_OPTS.size) {
3157      // total size in GB specified
3158      opts.totalRows = (long) (opts.size * rowsPerGB);
3159      opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
3160    } else {
3161      opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
3162      // Cast to float to ensure floating-point division
3163      opts.size = (float) opts.totalRows / rowsPerGB;
3164    }
3165    return opts;
3166  }
3167
3168  static int getRowsPerGB(final TestOptions opts) {
3169    return ONE_GB / ((opts.valueRandom ? opts.valueSize / 2 : opts.valueSize) * opts.getFamilies()
3170      * opts.getColumns());
3171  }
3172
3173  @Override
3174  public int run(String[] args) throws Exception {
3175    // Process command-line args. TODO: Better cmd-line processing
3176    // (but hopefully something not as painful as cli options).
3177    int errCode = -1;
3178    if (args.length < 1) {
3179      printUsage();
3180      return errCode;
3181    }
3182
3183    try {
3184      LinkedList<String> argv = new LinkedList<>();
3185      argv.addAll(Arrays.asList(args));
3186      TestOptions opts = parseOpts(argv);
3187
3188      // args remaining, print help and exit
3189      if (!argv.isEmpty()) {
3190        errCode = 0;
3191        printUsage();
3192        return errCode;
3193      }
3194
3195      // must run at least 1 client
3196      if (opts.numClientThreads <= 0) {
3197        throw new IllegalArgumentException("Number of clients must be > 0");
3198      }
3199
3200      // cmdName should not be null, print help and exit
3201      if (opts.cmdName == null) {
3202        printUsage();
3203        return errCode;
3204      }
3205
3206      Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
3207      if (cmdClass != null) {
3208        runTest(cmdClass, opts);
3209        errCode = 0;
3210      }
3211
3212    } catch (Exception e) {
3213      e.printStackTrace();
3214    }
3215
3216    return errCode;
3217  }
3218
3219  private static boolean isCommandClass(String cmd) {
3220    return COMMANDS.containsKey(cmd) || isCustomTestClass(cmd);
3221  }
3222
3223  private static boolean isCustomTestClass(String cmd) {
3224    Class<? extends Test> cmdClass;
3225    try {
3226      cmdClass =
3227        (Class<? extends Test>) PerformanceEvaluation.class.getClassLoader().loadClass(cmd);
3228      addCommandDescriptor(cmdClass, cmd, "custom command");
3229      return true;
3230    } catch (Throwable th) {
3231      LOG.info("No class found for command: " + cmd, th);
3232      return false;
3233    }
3234  }
3235
3236  private static Class<? extends TestBase> determineCommandClass(String cmd) {
3237    CmdDescriptor descriptor = COMMANDS.get(cmd);
3238    return descriptor != null ? descriptor.getCmdClass() : null;
3239  }
3240
3241  public static void main(final String[] args) throws Exception {
3242    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
3243    System.exit(res);
3244  }
3245}