001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rest; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.io.PrintStream; 024import java.lang.reflect.Constructor; 025import java.text.SimpleDateFormat; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Date; 029import java.util.List; 030import java.util.Map; 031import java.util.Random; 032import java.util.TreeMap; 033import java.util.regex.Matcher; 034import java.util.regex.Pattern; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.conf.Configured; 037import org.apache.hadoop.fs.FSDataInputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.ArrayBackedTag; 042import org.apache.hadoop.hbase.CompareOperator; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.Tag; 048import org.apache.hadoop.hbase.client.BufferedMutator; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.Connection; 051import org.apache.hadoop.hbase.client.ConnectionFactory; 052import org.apache.hadoop.hbase.client.Durability; 053import org.apache.hadoop.hbase.client.Get; 054import org.apache.hadoop.hbase.client.Put; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.ResultScanner; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableDescriptor; 060import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 061import org.apache.hadoop.hbase.filter.BinaryComparator; 062import org.apache.hadoop.hbase.filter.Filter; 063import org.apache.hadoop.hbase.filter.PageFilter; 064import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 065import org.apache.hadoop.hbase.filter.WhileMatchFilter; 066import org.apache.hadoop.hbase.io.compress.Compression; 067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 068import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 069import org.apache.hadoop.hbase.rest.client.Client; 070import org.apache.hadoop.hbase.rest.client.Cluster; 071import org.apache.hadoop.hbase.rest.client.RemoteAdmin; 072import org.apache.hadoop.hbase.util.ByteArrayHashKey; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 075import org.apache.hadoop.hbase.util.Hash; 076import org.apache.hadoop.hbase.util.MurmurHash; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.io.LongWritable; 079import org.apache.hadoop.io.NullWritable; 080import org.apache.hadoop.io.Text; 081import org.apache.hadoop.io.Writable; 082import org.apache.hadoop.mapreduce.InputSplit; 083import org.apache.hadoop.mapreduce.Job; 084import org.apache.hadoop.mapreduce.JobContext; 085import org.apache.hadoop.mapreduce.Mapper; 086import org.apache.hadoop.mapreduce.RecordReader; 087import org.apache.hadoop.mapreduce.TaskAttemptContext; 088import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 089import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 090import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 091import org.apache.hadoop.util.LineReader; 092import org.apache.hadoop.util.Tool; 093import org.apache.hadoop.util.ToolRunner; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * Script used evaluating Stargate performance and scalability. Runs a SG client that steps through 099 * one of a set of hardcoded tests or 'experiments' (e.g. a random reads test, a random writes test, 100 * etc.). Pass on the command-line which test to run and how many clients are participating in this 101 * experiment. Run <code>java PerformanceEvaluation --help</code> to obtain usage. 102 * <p> 103 * This class sets up and runs the evaluation programs described in Section 7, <i>Performance 104 * Evaluation</i>, of the <a href="http://labs.google.com/papers/bigtable.html">Bigtable</a> paper, 105 * pages 8-10. 106 * <p> 107 * If number of clients > 1, we start up a MapReduce job. Each map task runs an individual client. 108 * Each client does about 1GB of data. 109 */ 110public class PerformanceEvaluation extends Configured implements Tool { 111 protected static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class); 112 113 private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; 114 private static final int ROW_LENGTH = 1000; 115 private static final int TAG_LENGTH = 256; 116 private static final int ONE_GB = 1024 * 1024 * 1000; 117 private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; 118 119 public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); 120 public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); 121 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); 122 private TableName tableName = TABLE_NAME; 123 124 protected TableDescriptor TABLE_DESCRIPTOR; 125 protected Map<String, CmdDescriptor> commands = new TreeMap<>(); 126 protected static Cluster cluster = new Cluster(); 127 128 volatile Configuration conf; 129 private boolean nomapred = false; 130 private int N = 1; 131 private int R = ROWS_PER_GB; 132 private Compression.Algorithm compression = Compression.Algorithm.NONE; 133 private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; 134 private boolean flushCommits = true; 135 private boolean writeToWAL = true; 136 private boolean inMemoryCF = false; 137 private int presplitRegions = 0; 138 private boolean useTags = false; 139 private int noOfTags = 1; 140 private Connection connection; 141 142 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); 143 144 /** 145 * Regex to parse lines in input file passed to mapreduce task. 146 */ 147 public static final Pattern LINE_PATTERN = Pattern 148 .compile("tableName=(\\w+),\\s+" + "startRow=(\\d+),\\s+" + "perClientRunRows=(\\d+),\\s+" 149 + "totalRows=(\\d+),\\s+" + "clients=(\\d+),\\s+" + "flushCommits=(\\w+),\\s+" 150 + "writeToWAL=(\\w+),\\s+" + "useTags=(\\w+),\\s+" + "noOfTags=(\\d+)"); 151 152 /** 153 * Enum for map metrics. Keep it out here rather than inside in the Map inner-class so we can find 154 * associated properties. 155 */ 156 protected enum Counter { 157 /** elapsed time */ 158 ELAPSED_TIME, 159 /** number of rows */ 160 ROWS 161 } 162 163 /** 164 * Constructor 165 * @param c Configuration object 166 */ 167 public PerformanceEvaluation(final Configuration c) { 168 this.conf = c; 169 170 addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test"); 171 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", 172 "Run random seek and scan 100 test"); 173 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", 174 "Run random seek scan with both start and stop row (max 10 rows)"); 175 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", 176 "Run random seek scan with both start and stop row (max 100 rows)"); 177 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", 178 "Run random seek scan with both start and stop row (max 1000 rows)"); 179 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", 180 "Run random seek scan with both start and stop row (max 10000 rows)"); 181 addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); 182 addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); 183 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); 184 addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); 185 addCommandDescriptor(FilteredScanTest.class, "filterScan", 186 "Run scan test using a filter to find a specific row based " 187 + "on it's value (make sure to use --rows=20)"); 188 } 189 190 protected void addCommandDescriptor(Class<? extends Test> cmdClass, String name, 191 String description) { 192 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); 193 commands.put(name, cmdDescriptor); 194 } 195 196 /** 197 * Implementations can have their status set. 198 */ 199 interface Status { 200 /** 201 * Sets status 202 * @param msg status message 203 * @throws IOException if setting the status fails 204 */ 205 void setStatus(final String msg) throws IOException; 206 } 207 208 /** 209 * This class works as the InputSplit of Performance Evaluation MapReduce InputFormat, and the 210 * Record Value of RecordReader. Each map task will only read one record from a PeInputSplit, the 211 * record value is the PeInputSplit itself. 212 */ 213 public static class PeInputSplit extends InputSplit implements Writable { 214 private TableName tableName; 215 private int startRow; 216 private int rows; 217 private int totalRows; 218 private int clients; 219 private boolean flushCommits; 220 private boolean writeToWAL; 221 private boolean useTags; 222 private int noOfTags; 223 224 public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, 225 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) { 226 this.tableName = tableName; 227 this.startRow = startRow; 228 this.rows = rows; 229 this.totalRows = totalRows; 230 this.clients = clients; 231 this.flushCommits = flushCommits; 232 this.writeToWAL = writeToWAL; 233 this.useTags = useTags; 234 this.noOfTags = noOfTags; 235 } 236 237 @Override 238 public void readFields(DataInput in) throws IOException { 239 int tableNameLen = in.readInt(); 240 byte[] name = new byte[tableNameLen]; 241 in.readFully(name); 242 this.tableName = TableName.valueOf(name); 243 this.startRow = in.readInt(); 244 this.rows = in.readInt(); 245 this.totalRows = in.readInt(); 246 this.clients = in.readInt(); 247 this.flushCommits = in.readBoolean(); 248 this.writeToWAL = in.readBoolean(); 249 this.useTags = in.readBoolean(); 250 this.noOfTags = in.readInt(); 251 } 252 253 @Override 254 public void write(DataOutput out) throws IOException { 255 byte[] name = this.tableName.toBytes(); 256 out.writeInt(name.length); 257 out.write(name); 258 out.writeInt(startRow); 259 out.writeInt(rows); 260 out.writeInt(totalRows); 261 out.writeInt(clients); 262 out.writeBoolean(flushCommits); 263 out.writeBoolean(writeToWAL); 264 out.writeBoolean(useTags); 265 out.writeInt(noOfTags); 266 } 267 268 @Override 269 public long getLength() { 270 return 0; 271 } 272 273 @Override 274 public String[] getLocations() { 275 return new String[0]; 276 } 277 278 public int getStartRow() { 279 return startRow; 280 } 281 282 public TableName getTableName() { 283 return tableName; 284 } 285 286 public int getRows() { 287 return rows; 288 } 289 290 public int getTotalRows() { 291 return totalRows; 292 } 293 294 public boolean isFlushCommits() { 295 return flushCommits; 296 } 297 298 public boolean isWriteToWAL() { 299 return writeToWAL; 300 } 301 302 public boolean isUseTags() { 303 return useTags; 304 } 305 306 public int getNoOfTags() { 307 return noOfTags; 308 } 309 } 310 311 /** 312 * InputFormat of Performance Evaluation MapReduce job. It extends from FileInputFormat, want to 313 * use it's methods such as setInputPaths(). 314 */ 315 public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> { 316 @Override 317 public List<InputSplit> getSplits(JobContext job) throws IOException { 318 // generate splits 319 List<InputSplit> splitList = new ArrayList<>(); 320 321 for (FileStatus file : listStatus(job)) { 322 if (file.isDirectory()) { 323 continue; 324 } 325 Path path = file.getPath(); 326 FileSystem fs = path.getFileSystem(job.getConfiguration()); 327 FSDataInputStream fileIn = fs.open(path); 328 LineReader in = new LineReader(fileIn, job.getConfiguration()); 329 int lineLen; 330 while (true) { 331 Text lineText = new Text(); 332 lineLen = in.readLine(lineText); 333 if (lineLen <= 0) { 334 break; 335 } 336 Matcher m = LINE_PATTERN.matcher(lineText.toString()); 337 if ((m != null) && m.matches()) { 338 TableName tableName = TableName.valueOf(m.group(1)); 339 int startRow = Integer.parseInt(m.group(2)); 340 int rows = Integer.parseInt(m.group(3)); 341 int totalRows = Integer.parseInt(m.group(4)); 342 int clients = Integer.parseInt(m.group(5)); 343 boolean flushCommits = Boolean.parseBoolean(m.group(6)); 344 boolean writeToWAL = Boolean.parseBoolean(m.group(7)); 345 boolean useTags = Boolean.parseBoolean(m.group(8)); 346 int noOfTags = Integer.parseInt(m.group(9)); 347 348 LOG.debug("tableName=" + tableName + " split[" + splitList.size() + "] " + " startRow=" 349 + startRow + " rows=" + rows + " totalRows=" + totalRows + " clients=" + clients 350 + " flushCommits=" + flushCommits + " writeToWAL=" + writeToWAL + " useTags=" 351 + useTags + " noOfTags=" + noOfTags); 352 353 PeInputSplit newSplit = new PeInputSplit(tableName, startRow, rows, totalRows, clients, 354 flushCommits, writeToWAL, useTags, noOfTags); 355 splitList.add(newSplit); 356 } 357 } 358 in.close(); 359 } 360 361 LOG.info("Total # of splits: " + splitList.size()); 362 return splitList; 363 } 364 365 @Override 366 public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split, 367 TaskAttemptContext context) { 368 return new PeRecordReader(); 369 } 370 371 public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> { 372 private boolean readOver = false; 373 private PeInputSplit split = null; 374 private NullWritable key = null; 375 private PeInputSplit value = null; 376 377 @Override 378 public void initialize(InputSplit split, TaskAttemptContext context) { 379 this.readOver = false; 380 this.split = (PeInputSplit) split; 381 } 382 383 @Override 384 public boolean nextKeyValue() { 385 if (readOver) { 386 return false; 387 } 388 389 key = NullWritable.get(); 390 value = split; 391 392 readOver = true; 393 return true; 394 } 395 396 @Override 397 public NullWritable getCurrentKey() { 398 return key; 399 } 400 401 @Override 402 public PeInputSplit getCurrentValue() { 403 return value; 404 } 405 406 @Override 407 public float getProgress() { 408 if (readOver) { 409 return 1.0f; 410 } else { 411 return 0.0f; 412 } 413 } 414 415 @Override 416 public void close() { 417 // do nothing 418 } 419 } 420 } 421 422 /** 423 * MapReduce job that runs a performance evaluation client in each map task. 424 */ 425 public static class EvaluationMapTask 426 extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> { 427 428 /** configuration parameter name that contains the command */ 429 public final static String CMD_KEY = "EvaluationMapTask.command"; 430 /** configuration parameter name that contains the PE impl */ 431 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; 432 433 private Class<? extends Test> cmd; 434 private PerformanceEvaluation pe; 435 436 @Override 437 protected void setup(Context context) { 438 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); 439 440 // this is required so that extensions of PE are instantiated within the 441 // map reduce task... 442 Class<? extends PerformanceEvaluation> peClass = 443 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); 444 try { 445 this.pe = 446 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration()); 447 } catch (Exception e) { 448 throw new IllegalStateException("Could not instantiate PE instance", e); 449 } 450 } 451 452 private <Type> Class<? extends Type> forName(String className, Class<Type> type) { 453 Class<? extends Type> clazz; 454 try { 455 clazz = Class.forName(className).asSubclass(type); 456 } catch (ClassNotFoundException e) { 457 throw new IllegalStateException("Could not find class for name: " + className, e); 458 } 459 return clazz; 460 } 461 462 @Override 463 protected void map(NullWritable key, PeInputSplit value, final Context context) 464 throws IOException, InterruptedException { 465 Status status = context::setStatus; 466 467 // Evaluation task 468 pe.tableName = value.getTableName(); 469 long elapsedTime = 470 this.pe.runOneClient(this.cmd, value.getStartRow(), value.getRows(), value.getTotalRows(), 471 value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), value.getNoOfTags(), 472 ConnectionFactory.createConnection(context.getConfiguration()), status); 473 // Collect how much time the thing took. Report as map output and 474 // to the ELAPSED_TIME counter. 475 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); 476 context.getCounter(Counter.ROWS).increment(value.rows); 477 context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); 478 context.progress(); 479 } 480 } 481 482 /** 483 * If table does not already exist, create. 484 * @param admin Client to use checking. 485 * @return True if we created the table. 486 * @throws IOException if an operation on the table fails 487 */ 488 private boolean checkTable(RemoteAdmin admin) throws IOException { 489 TableDescriptor tableDescriptor = getDescriptor(); 490 if (this.presplitRegions > 0) { 491 // presplit requested 492 if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) { 493 admin.deleteTable(tableDescriptor.getTableName().getName()); 494 } 495 496 byte[][] splits = getSplits(); 497 for (int i = 0; i < splits.length; i++) { 498 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); 499 } 500 admin.createTable(tableDescriptor); 501 LOG.info("Table created with " + this.presplitRegions + " splits"); 502 } else { 503 boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName()); 504 if (!tableExists) { 505 admin.createTable(tableDescriptor); 506 LOG.info("Table " + tableDescriptor + " created"); 507 } 508 } 509 510 return admin.isTableAvailable(tableDescriptor.getTableName().getName()); 511 } 512 513 protected TableDescriptor getDescriptor() { 514 if (TABLE_DESCRIPTOR == null) { 515 TABLE_DESCRIPTOR = 516 TableDescriptorBuilder.newBuilder(tableName) 517 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME) 518 .setDataBlockEncoding(blockEncoding).setCompressionType(compression) 519 .setInMemory(inMemoryCF).build()) 520 .build(); 521 } 522 return TABLE_DESCRIPTOR; 523 } 524 525 /** 526 * Generates splits based on total number of rows and specified split regions 527 * @return splits : array of byte [] 528 */ 529 protected byte[][] getSplits() { 530 if (this.presplitRegions == 0) { 531 return new byte[0][]; 532 } 533 534 int numSplitPoints = presplitRegions - 1; 535 byte[][] splits = new byte[numSplitPoints][]; 536 int jump = this.R / this.presplitRegions; 537 for (int i = 0; i < numSplitPoints; i++) { 538 int rowkey = jump * (1 + i); 539 splits[i] = format(rowkey); 540 } 541 return splits; 542 } 543 544 /** 545 * We're to run multiple clients concurrently. Setup a mapreduce job. Run one map per client. Then 546 * run a single reduce to sum the elapsed times. 547 * @param cmd Command to run. 548 */ 549 private void runNIsMoreThanOne(final Class<? extends Test> cmd) 550 throws IOException, InterruptedException, ClassNotFoundException { 551 RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf()); 552 checkTable(remoteAdmin); 553 if (nomapred) { 554 doMultipleClients(cmd); 555 } else { 556 doMapReduce(cmd); 557 } 558 } 559 560 /** 561 * Run all clients in this vm each to its own thread. 562 * @param cmd Command to run 563 * @throws IOException if creating a connection fails 564 */ 565 private void doMultipleClients(final Class<? extends Test> cmd) throws IOException { 566 final List<Thread> threads = new ArrayList<>(this.N); 567 final long[] timings = new long[this.N]; 568 final int perClientRows = R / N; 569 final TableName tableName = this.tableName; 570 final DataBlockEncoding encoding = this.blockEncoding; 571 final boolean flushCommits = this.flushCommits; 572 final Compression.Algorithm compression = this.compression; 573 final boolean writeToWal = this.writeToWAL; 574 final int preSplitRegions = this.presplitRegions; 575 final boolean useTags = this.useTags; 576 final int numTags = this.noOfTags; 577 final Connection connection = ConnectionFactory.createConnection(getConf()); 578 for (int i = 0; i < this.N; i++) { 579 final int index = i; 580 Thread t = new Thread("TestClient-" + i) { 581 @Override 582 public void run() { 583 super.run(); 584 PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); 585 pe.tableName = tableName; 586 pe.blockEncoding = encoding; 587 pe.flushCommits = flushCommits; 588 pe.compression = compression; 589 pe.writeToWAL = writeToWal; 590 pe.presplitRegions = preSplitRegions; 591 pe.N = N; 592 pe.connection = connection; 593 pe.useTags = useTags; 594 pe.noOfTags = numTags; 595 try { 596 long elapsedTime = pe.runOneClient(cmd, index * perClientRows, perClientRows, R, 597 flushCommits, writeToWAL, useTags, noOfTags, connection, 598 msg -> LOG.info("client-" + getName() + " " + msg)); 599 timings[index] = elapsedTime; 600 LOG.info("Finished " + getName() + " in " + elapsedTime + "ms writing " + perClientRows 601 + " rows"); 602 } catch (IOException e) { 603 throw new RuntimeException(e); 604 } 605 } 606 }; 607 threads.add(t); 608 } 609 for (Thread t : threads) { 610 t.start(); 611 } 612 for (Thread t : threads) { 613 while (t.isAlive()) { 614 try { 615 t.join(); 616 } catch (InterruptedException e) { 617 LOG.debug("Interrupted, continuing" + e.toString()); 618 } 619 } 620 } 621 final String test = cmd.getSimpleName(); 622 LOG.info("[" + test + "] Summary of timings (ms): " + Arrays.toString(timings)); 623 Arrays.sort(timings); 624 long total = 0; 625 for (int i = 0; i < this.N; i++) { 626 total += timings[i]; 627 } 628 LOG.info("[" + test + "]" + "\tMin: " + timings[0] + "ms" + "\tMax: " + timings[this.N - 1] 629 + "ms" + "\tAvg: " + (total / this.N) + "ms"); 630 } 631 632 /** 633 * Run a mapreduce job. Run as many maps as asked-for clients. Before we start up the job, write 634 * out an input file with instruction per client regards which row they are to start on. 635 * @param cmd Command to run. 636 */ 637 private void doMapReduce(final Class<? extends Test> cmd) 638 throws IOException, InterruptedException, ClassNotFoundException { 639 Configuration conf = getConf(); 640 Path inputDir = writeInputFile(conf); 641 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); 642 conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); 643 Job job = Job.getInstance(conf); 644 job.setJarByClass(PerformanceEvaluation.class); 645 job.setJobName("HBase Performance Evaluation"); 646 647 job.setInputFormatClass(PeInputFormat.class); 648 PeInputFormat.setInputPaths(job, inputDir); 649 650 job.setOutputKeyClass(LongWritable.class); 651 job.setOutputValueClass(LongWritable.class); 652 653 job.setMapperClass(EvaluationMapTask.class); 654 job.setReducerClass(LongSumReducer.class); 655 job.setNumReduceTasks(1); 656 657 job.setOutputFormatClass(TextOutputFormat.class); 658 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs")); 659 TableMapReduceUtil.addDependencyJars(job); 660 TableMapReduceUtil.initCredentials(job); 661 job.waitForCompletion(true); 662 } 663 664 /** 665 * Write input file of offsets-per-client for the mapreduce job. 666 * @param c Configuration 667 * @return Directory that contains file written. 668 * @throws IOException if creating the directory or the file fails 669 */ 670 private Path writeInputFile(final Configuration c) throws IOException { 671 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); 672 Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); 673 Path inputDir = new Path(jobdir, "inputs"); 674 675 FileSystem fs = FileSystem.get(c); 676 fs.mkdirs(inputDir); 677 Path inputFile = new Path(inputDir, "input.txt"); 678 // Make input random. 679 try (PrintStream out = new PrintStream(fs.create(inputFile))) { 680 Map<Integer, String> m = new TreeMap<>(); 681 Hash h = MurmurHash.getInstance(); 682 int perClientRows = (this.R / this.N); 683 for (int i = 0; i < 10; i++) { 684 for (int j = 0; j < N; j++) { 685 StringBuilder s = new StringBuilder(); 686 s.append("tableName=").append(tableName); 687 s.append(", startRow=").append((j * perClientRows) + (i * (perClientRows / 10))); 688 s.append(", perClientRunRows=").append(perClientRows / 10); 689 s.append(", totalRows=").append(R); 690 s.append(", clients=").append(N); 691 s.append(", flushCommits=").append(flushCommits); 692 s.append(", writeToWAL=").append(writeToWAL); 693 s.append(", useTags=").append(useTags); 694 s.append(", noOfTags=").append(noOfTags); 695 696 byte[] b = Bytes.toBytes(s.toString()); 697 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1); 698 m.put(hash, s.toString()); 699 } 700 } 701 for (Map.Entry<Integer, String> e : m.entrySet()) { 702 out.println(e.getValue()); 703 } 704 } 705 return inputDir; 706 } 707 708 /** 709 * Describes a command. 710 */ 711 static class CmdDescriptor { 712 private Class<? extends Test> cmdClass; 713 private String name; 714 private String description; 715 716 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) { 717 this.cmdClass = cmdClass; 718 this.name = name; 719 this.description = description; 720 } 721 722 public Class<? extends Test> getCmdClass() { 723 return cmdClass; 724 } 725 726 public String getName() { 727 return name; 728 } 729 730 public String getDescription() { 731 return description; 732 } 733 } 734 735 /** 736 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation} tests This 737 * makes the reflection logic a little easier to understand... 738 */ 739 static class TestOptions { 740 private int startRow; 741 private int perClientRunRows; 742 private int totalRows; 743 private TableName tableName; 744 private boolean flushCommits; 745 private boolean writeToWAL; 746 private boolean useTags; 747 private int noOfTags; 748 private Connection connection; 749 750 TestOptions(int startRow, int perClientRunRows, int totalRows, TableName tableName, 751 boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, 752 Connection connection) { 753 this.startRow = startRow; 754 this.perClientRunRows = perClientRunRows; 755 this.totalRows = totalRows; 756 this.tableName = tableName; 757 this.flushCommits = flushCommits; 758 this.writeToWAL = writeToWAL; 759 this.useTags = useTags; 760 this.noOfTags = noOfTags; 761 this.connection = connection; 762 } 763 764 public int getStartRow() { 765 return startRow; 766 } 767 768 public int getPerClientRunRows() { 769 return perClientRunRows; 770 } 771 772 public int getTotalRows() { 773 return totalRows; 774 } 775 776 public TableName getTableName() { 777 return tableName; 778 } 779 780 public boolean isFlushCommits() { 781 return flushCommits; 782 } 783 784 public boolean isWriteToWAL() { 785 return writeToWAL; 786 } 787 788 public Connection getConnection() { 789 return connection; 790 } 791 792 public boolean isUseTags() { 793 return this.useTags; 794 } 795 796 public int getNumTags() { 797 return this.noOfTags; 798 } 799 } 800 801 /* 802 * A test. Subclass to particularize what happens per row. 803 */ 804 static abstract class Test { 805 // Below is make it so when Tests are all running in the one 806 // jvm, that they each have a differently seeded Random. 807 private static final Random randomSeed = new Random(EnvironmentEdgeManager.currentTime()); 808 809 private static long nextRandomSeed() { 810 return randomSeed.nextLong(); 811 } 812 813 protected final Random rand = new Random(nextRandomSeed()); 814 815 protected final int startRow; 816 protected final int perClientRunRows; 817 protected final int totalRows; 818 private final Status status; 819 protected TableName tableName; 820 protected volatile Configuration conf; 821 protected boolean writeToWAL; 822 protected boolean useTags; 823 protected int noOfTags; 824 protected Connection connection; 825 826 /** 827 * Note that all subclasses of this class must provide a public contructor that has the exact 828 * same list of arguments. 829 */ 830 Test(final Configuration conf, final TestOptions options, final Status status) { 831 super(); 832 this.startRow = options.getStartRow(); 833 this.perClientRunRows = options.getPerClientRunRows(); 834 this.totalRows = options.getTotalRows(); 835 this.status = status; 836 this.tableName = options.getTableName(); 837 this.conf = conf; 838 this.writeToWAL = options.isWriteToWAL(); 839 this.useTags = options.isUseTags(); 840 this.noOfTags = options.getNumTags(); 841 this.connection = options.getConnection(); 842 } 843 844 protected String generateStatus(final int sr, final int i, final int lr) { 845 return sr + "/" + i + "/" + lr; 846 } 847 848 protected int getReportingPeriod() { 849 int period = this.perClientRunRows / 10; 850 return period == 0 ? this.perClientRunRows : period; 851 } 852 853 abstract void testTakedown() throws IOException; 854 855 /** 856 * Run test 857 * @return Elapsed time. 858 * @throws IOException if something in the test fails 859 */ 860 long test() throws IOException { 861 testSetup(); 862 LOG.info("Timed test starting in thread " + Thread.currentThread().getName()); 863 final long startTime = System.nanoTime(); 864 try { 865 testTimed(); 866 } finally { 867 testTakedown(); 868 } 869 return (System.nanoTime() - startTime) / 1000000; 870 } 871 872 abstract void testSetup() throws IOException; 873 874 /** 875 * Provides an extension point for tests that don't want a per row invocation. 876 */ 877 void testTimed() throws IOException { 878 int lastRow = this.startRow + this.perClientRunRows; 879 // Report on completion of 1/10th of total. 880 for (int i = this.startRow; i < lastRow; i++) { 881 testRow(i); 882 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { 883 status.setStatus(generateStatus(this.startRow, i, lastRow)); 884 } 885 } 886 } 887 888 /** 889 * Test for individual row. 890 * @param i Row index. 891 */ 892 abstract void testRow(final int i) throws IOException; 893 } 894 895 static abstract class TableTest extends Test { 896 protected Table table; 897 898 public TableTest(Configuration conf, TestOptions options, Status status) { 899 super(conf, options, status); 900 } 901 902 @Override 903 void testSetup() throws IOException { 904 this.table = connection.getTable(tableName); 905 } 906 907 @Override 908 void testTakedown() throws IOException { 909 table.close(); 910 } 911 } 912 913 static abstract class BufferedMutatorTest extends Test { 914 protected BufferedMutator mutator; 915 protected boolean flushCommits; 916 917 public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { 918 super(conf, options, status); 919 this.flushCommits = options.isFlushCommits(); 920 } 921 922 @Override 923 void testSetup() throws IOException { 924 this.mutator = connection.getBufferedMutator(tableName); 925 } 926 927 @Override 928 void testTakedown() throws IOException { 929 if (flushCommits) { 930 this.mutator.flush(); 931 } 932 mutator.close(); 933 } 934 } 935 936 static class RandomSeekScanTest extends TableTest { 937 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { 938 super(conf, options, status); 939 } 940 941 @Override 942 void testRow(final int i) throws IOException { 943 Scan scan = new Scan().withStartRow(getRandomRow(this.rand, this.totalRows)); 944 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 945 scan.setFilter(new WhileMatchFilter(new PageFilter(120))); 946 ResultScanner s = this.table.getScanner(scan); 947 s.close(); 948 } 949 950 @Override 951 protected int getReportingPeriod() { 952 int period = this.perClientRunRows / 100; 953 return period == 0 ? this.perClientRunRows : period; 954 } 955 } 956 957 @SuppressWarnings("unused") 958 static abstract class RandomScanWithRangeTest extends TableTest { 959 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { 960 super(conf, options, status); 961 } 962 963 @Override 964 void testRow(final int i) throws IOException { 965 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow(); 966 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) 967 .withStopRow(startAndStopRow.getSecond()); 968 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 969 ResultScanner s = this.table.getScanner(scan); 970 int count = 0; 971 for (Result rr = null; (rr = s.next()) != null;) { 972 count++; 973 } 974 975 if (i % 100 == 0) { 976 LOG.info(String.format("Scan for key range %s - %s returned %s rows", 977 Bytes.toString(startAndStopRow.getFirst()), Bytes.toString(startAndStopRow.getSecond()), 978 count)); 979 } 980 981 s.close(); 982 } 983 984 protected abstract Pair<byte[], byte[]> getStartAndStopRow(); 985 986 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) { 987 int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; 988 int stop = start + maxRange; 989 return new Pair<>(format(start), format(stop)); 990 } 991 992 @Override 993 protected int getReportingPeriod() { 994 int period = this.perClientRunRows / 100; 995 return period == 0 ? this.perClientRunRows : period; 996 } 997 } 998 999 static class RandomScanWithRange10Test extends RandomScanWithRangeTest { 1000 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) { 1001 super(conf, options, status); 1002 } 1003 1004 @Override 1005 protected Pair<byte[], byte[]> getStartAndStopRow() { 1006 return generateStartAndStopRows(10); 1007 } 1008 } 1009 1010 static class RandomScanWithRange100Test extends RandomScanWithRangeTest { 1011 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) { 1012 super(conf, options, status); 1013 } 1014 1015 @Override 1016 protected Pair<byte[], byte[]> getStartAndStopRow() { 1017 return generateStartAndStopRows(100); 1018 } 1019 } 1020 1021 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { 1022 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) { 1023 super(conf, options, status); 1024 } 1025 1026 @Override 1027 protected Pair<byte[], byte[]> getStartAndStopRow() { 1028 return generateStartAndStopRows(1000); 1029 } 1030 } 1031 1032 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { 1033 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) { 1034 super(conf, options, status); 1035 } 1036 1037 @Override 1038 protected Pair<byte[], byte[]> getStartAndStopRow() { 1039 return generateStartAndStopRows(10000); 1040 } 1041 } 1042 1043 static class RandomReadTest extends TableTest { 1044 RandomReadTest(Configuration conf, TestOptions options, Status status) { 1045 super(conf, options, status); 1046 } 1047 1048 @Override 1049 void testRow(final int i) throws IOException { 1050 Get get = new Get(getRandomRow(this.rand, this.totalRows)); 1051 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1052 this.table.get(get); 1053 } 1054 1055 @Override 1056 protected int getReportingPeriod() { 1057 int period = this.perClientRunRows / 100; 1058 return period == 0 ? this.perClientRunRows : period; 1059 } 1060 } 1061 1062 static class RandomWriteTest extends BufferedMutatorTest { 1063 RandomWriteTest(Configuration conf, TestOptions options, Status status) { 1064 super(conf, options, status); 1065 } 1066 1067 @Override 1068 void testRow(final int i) throws IOException { 1069 byte[] row = getRandomRow(this.rand, this.totalRows); 1070 Put put = new Put(row); 1071 byte[] value = generateData(this.rand, ROW_LENGTH); 1072 if (useTags) { 1073 byte[] tag = generateData(this.rand, TAG_LENGTH); 1074 Tag[] tags = new Tag[noOfTags]; 1075 for (int n = 0; n < noOfTags; n++) { 1076 Tag t = new ArrayBackedTag((byte) n, tag); 1077 tags[n] = t; 1078 } 1079 KeyValue kv = 1080 new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, value, tags); 1081 put.add(kv); 1082 } else { 1083 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1084 } 1085 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1086 mutator.mutate(put); 1087 } 1088 } 1089 1090 static class ScanTest extends TableTest { 1091 private ResultScanner testScanner; 1092 1093 ScanTest(Configuration conf, TestOptions options, Status status) { 1094 super(conf, options, status); 1095 } 1096 1097 @Override 1098 void testTakedown() throws IOException { 1099 if (this.testScanner != null) { 1100 this.testScanner.close(); 1101 } 1102 super.testTakedown(); 1103 } 1104 1105 @Override 1106 void testRow(final int i) throws IOException { 1107 if (this.testScanner == null) { 1108 Scan scan = new Scan().withStartRow(format(this.startRow)); 1109 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1110 this.testScanner = table.getScanner(scan); 1111 } 1112 testScanner.next(); 1113 } 1114 } 1115 1116 static class SequentialReadTest extends TableTest { 1117 SequentialReadTest(Configuration conf, TestOptions options, Status status) { 1118 super(conf, options, status); 1119 } 1120 1121 @Override 1122 void testRow(final int i) throws IOException { 1123 Get get = new Get(format(i)); 1124 get.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1125 table.get(get); 1126 } 1127 } 1128 1129 static class SequentialWriteTest extends BufferedMutatorTest { 1130 SequentialWriteTest(Configuration conf, TestOptions options, Status status) { 1131 super(conf, options, status); 1132 } 1133 1134 @Override 1135 void testRow(final int i) throws IOException { 1136 byte[] row = format(i); 1137 Put put = new Put(row); 1138 byte[] value = generateData(this.rand, ROW_LENGTH); 1139 if (useTags) { 1140 byte[] tag = generateData(this.rand, TAG_LENGTH); 1141 Tag[] tags = new Tag[noOfTags]; 1142 for (int n = 0; n < noOfTags; n++) { 1143 Tag t = new ArrayBackedTag((byte) n, tag); 1144 tags[n] = t; 1145 } 1146 KeyValue kv = 1147 new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP, value, tags); 1148 put.add(kv); 1149 } else { 1150 put.addColumn(FAMILY_NAME, QUALIFIER_NAME, value); 1151 } 1152 put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); 1153 mutator.mutate(put); 1154 } 1155 } 1156 1157 static class FilteredScanTest extends TableTest { 1158 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName()); 1159 1160 FilteredScanTest(Configuration conf, TestOptions options, Status status) { 1161 super(conf, options, status); 1162 } 1163 1164 @Override 1165 void testRow(int i) throws IOException { 1166 byte[] value = generateValue(this.rand); 1167 Scan scan = constructScan(value); 1168 try (ResultScanner scanner = this.table.getScanner(scan)) { 1169 while (scanner.next() != null) { 1170 } 1171 } 1172 } 1173 1174 protected Scan constructScan(byte[] valuePrefix) { 1175 Filter filter = new SingleColumnValueFilter(FAMILY_NAME, QUALIFIER_NAME, 1176 CompareOperator.EQUAL, new BinaryComparator(valuePrefix)); 1177 Scan scan = new Scan(); 1178 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); 1179 scan.setFilter(filter); 1180 return scan; 1181 } 1182 } 1183 1184 /** 1185 * Format passed integer. 1186 * @param number the integer to format 1187 * @return Returns zero-prefixed 10-byte wide decimal version of passed number (Does absolute in 1188 * case number is negative). 1189 */ 1190 public static byte[] format(final int number) { 1191 byte[] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; 1192 int d = Math.abs(number); 1193 for (int i = b.length - 1; i >= 0; i--) { 1194 b[i] = (byte) ((d % 10) + '0'); 1195 d /= 10; 1196 } 1197 return b; 1198 } 1199 1200 public static byte[] generateData(final Random r, int length) { 1201 byte[] b = new byte[length]; 1202 int i; 1203 1204 for (i = 0; i < (length - 8); i += 8) { 1205 b[i] = (byte) (65 + r.nextInt(26)); 1206 b[i + 1] = b[i]; 1207 b[i + 2] = b[i]; 1208 b[i + 3] = b[i]; 1209 b[i + 4] = b[i]; 1210 b[i + 5] = b[i]; 1211 b[i + 6] = b[i]; 1212 b[i + 7] = b[i]; 1213 } 1214 1215 byte a = (byte) (65 + r.nextInt(26)); 1216 for (; i < length; i++) { 1217 b[i] = a; 1218 } 1219 return b; 1220 } 1221 1222 public static byte[] generateValue(final Random r) { 1223 byte[] b = new byte[ROW_LENGTH]; 1224 r.nextBytes(b); 1225 return b; 1226 } 1227 1228 static byte[] getRandomRow(final Random random, final int totalRows) { 1229 return format(random.nextInt(Integer.MAX_VALUE) % totalRows); 1230 } 1231 1232 long runOneClient(final Class<? extends Test> cmd, final int startRow, final int perClientRunRows, 1233 final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, 1234 Connection connection, final Status status) throws IOException { 1235 status 1236 .setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); 1237 long totalElapsedTime; 1238 1239 TestOptions options = new TestOptions(startRow, perClientRunRows, totalRows, tableName, 1240 flushCommits, writeToWAL, useTags, noOfTags, connection); 1241 final Test t; 1242 try { 1243 Constructor<? extends Test> constructor = 1244 cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class); 1245 t = constructor.newInstance(this.conf, options, status); 1246 } catch (NoSuchMethodException e) { 1247 throw new IllegalArgumentException("Invalid command class: " + cmd.getName() 1248 + ". It does not provide a constructor as described by" 1249 + "the javadoc comment. Available constructors are: " 1250 + Arrays.toString(cmd.getConstructors())); 1251 } catch (Exception e) { 1252 throw new IllegalStateException("Failed to construct command class", e); 1253 } 1254 totalElapsedTime = t.test(); 1255 1256 status.setStatus("Finished " + cmd + " in " + totalElapsedTime + "ms at offset " + startRow 1257 + " for " + perClientRunRows + " rows"); 1258 return totalElapsedTime; 1259 } 1260 1261 private void runNIsOne(final Class<? extends Test> cmd) { 1262 Status status = LOG::info; 1263 1264 RemoteAdmin admin; 1265 try { 1266 Client client = new Client(cluster); 1267 admin = new RemoteAdmin(client, getConf()); 1268 checkTable(admin); 1269 runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, this.useTags, 1270 this.noOfTags, this.connection, status); 1271 } catch (Exception e) { 1272 LOG.error("Failed", e); 1273 } 1274 } 1275 1276 private void runTest(final Class<? extends Test> cmd) 1277 throws IOException, InterruptedException, ClassNotFoundException { 1278 if (N == 1) { 1279 // If there is only one client and one HRegionServer, we assume nothing 1280 // has been set up at all. 1281 runNIsOne(cmd); 1282 } else { 1283 // Else, run 1284 runNIsMoreThanOne(cmd); 1285 } 1286 } 1287 1288 protected void printUsage() { 1289 printUsage(null); 1290 } 1291 1292 protected void printUsage(final String message) { 1293 if (message != null && message.length() > 0) { 1294 System.err.println(message); 1295 } 1296 System.err.println("Usage: java " + this.getClass().getName() + " \\"); 1297 System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); 1298 System.err.println( 1299 " [--compress=TYPE] [--blockEncoding=TYPE] " + "[-D<property=value>]* <command> <nclients>"); 1300 System.err.println(); 1301 System.err.println("General Options:"); 1302 System.err.println( 1303 " nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); 1304 System.err.println(" rows Rows each client runs. Default: One million"); 1305 System.err.println(); 1306 System.err.println("Table Creation / Write Tests:"); 1307 System.err.println(" table Alternate table name. Default: 'TestTable'"); 1308 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); 1309 System.err.println( 1310 " flushCommits Used to determine if the test should flush the table. " + "Default: false"); 1311 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); 1312 System.err.println(" presplit Create presplit table. Recommended for accurate perf " 1313 + "analysis (see guide). Default: disabled"); 1314 System.err.println( 1315 " usetags Writes tags along with KVs. Use with HFile V3. " + "Default : false"); 1316 System.err.println(" numoftags Specify the no of tags that would be needed. " 1317 + "This works only if usetags is true."); 1318 System.err.println(); 1319 System.err.println("Read Tests:"); 1320 System.err.println(" inmemory Tries to keep the HFiles of the CF inmemory as far as " 1321 + "possible. Not guaranteed that reads are always served from inmemory. Default: false"); 1322 System.err.println(); 1323 System.err.println(" Note: -D properties will be applied to the conf used. "); 1324 System.err.println(" For example: "); 1325 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true"); 1326 System.err.println(" -Dmapreduce.task.timeout=60000"); 1327 System.err.println(); 1328 System.err.println("Command:"); 1329 for (CmdDescriptor command : commands.values()) { 1330 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); 1331 } 1332 System.err.println(); 1333 System.err.println("Args:"); 1334 System.err.println( 1335 " nclients Integer. Required. Total number of " + "clients (and HRegionServers)"); 1336 System.err.println(" running: 1 <= value <= 500"); 1337 System.err.println("Examples:"); 1338 System.err.println(" To run a single evaluation client:"); 1339 System.err.println(" $ hbase " + this.getClass().getName() + " sequentialWrite 1"); 1340 } 1341 1342 private void getArgs(final int start, final String[] args) { 1343 if (start + 1 > args.length) { 1344 throw new IllegalArgumentException("must supply the number of clients"); 1345 } 1346 N = Integer.parseInt(args[start]); 1347 if (N < 1) { 1348 throw new IllegalArgumentException("Number of clients must be > 1"); 1349 } 1350 // Set total number of rows to write. 1351 R = R * N; 1352 } 1353 1354 @Override 1355 public int run(String[] args) throws Exception { 1356 // Process command-line args. TODO: Better cmd-line processing 1357 // (but hopefully something not as painful as cli options). 1358 int errCode = -1; 1359 if (args.length < 1) { 1360 printUsage(); 1361 return errCode; 1362 } 1363 1364 try { 1365 for (int i = 0; i < args.length; i++) { 1366 String cmd = args[i]; 1367 if (cmd.equals("-h") || cmd.startsWith("--h")) { 1368 printUsage(); 1369 errCode = 0; 1370 break; 1371 } 1372 1373 final String nmr = "--nomapred"; 1374 if (cmd.startsWith(nmr)) { 1375 nomapred = true; 1376 continue; 1377 } 1378 1379 final String rows = "--rows="; 1380 if (cmd.startsWith(rows)) { 1381 R = Integer.parseInt(cmd.substring(rows.length())); 1382 continue; 1383 } 1384 1385 final String table = "--table="; 1386 if (cmd.startsWith(table)) { 1387 this.tableName = TableName.valueOf(cmd.substring(table.length())); 1388 continue; 1389 } 1390 1391 final String compress = "--compress="; 1392 if (cmd.startsWith(compress)) { 1393 this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); 1394 continue; 1395 } 1396 1397 final String blockEncoding = "--blockEncoding="; 1398 if (cmd.startsWith(blockEncoding)) { 1399 this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); 1400 continue; 1401 } 1402 1403 final String flushCommits = "--flushCommits="; 1404 if (cmd.startsWith(flushCommits)) { 1405 this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); 1406 continue; 1407 } 1408 1409 final String writeToWAL = "--writeToWAL="; 1410 if (cmd.startsWith(writeToWAL)) { 1411 this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); 1412 continue; 1413 } 1414 1415 final String presplit = "--presplit="; 1416 if (cmd.startsWith(presplit)) { 1417 this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); 1418 continue; 1419 } 1420 1421 final String inMemory = "--inmemory="; 1422 if (cmd.startsWith(inMemory)) { 1423 this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); 1424 continue; 1425 } 1426 1427 this.connection = ConnectionFactory.createConnection(getConf()); 1428 1429 final String useTags = "--usetags="; 1430 if (cmd.startsWith(useTags)) { 1431 this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); 1432 continue; 1433 } 1434 1435 final String noOfTags = "--nooftags="; 1436 if (cmd.startsWith(noOfTags)) { 1437 this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); 1438 continue; 1439 } 1440 1441 final String host = "--host="; 1442 if (cmd.startsWith(host)) { 1443 cluster.add(cmd.substring(host.length())); 1444 continue; 1445 } 1446 1447 Class<? extends Test> cmdClass = determineCommandClass(cmd); 1448 if (cmdClass != null) { 1449 getArgs(i + 1, args); 1450 if (cluster.isEmpty()) { 1451 String s = conf.get("stargate.hostname", "localhost"); 1452 if (s.contains(":")) { 1453 cluster.add(s); 1454 } else { 1455 cluster.add(s, conf.getInt("stargate.port", 8080)); 1456 } 1457 } 1458 runTest(cmdClass); 1459 errCode = 0; 1460 break; 1461 } 1462 1463 printUsage(); 1464 break; 1465 } 1466 } catch (Exception e) { 1467 LOG.error("Failed", e); 1468 } 1469 1470 return errCode; 1471 } 1472 1473 private Class<? extends Test> determineCommandClass(String cmd) { 1474 CmdDescriptor descriptor = commands.get(cmd); 1475 return descriptor != null ? descriptor.getCmdClass() : null; 1476 } 1477 1478 public static void main(final String[] args) throws Exception { 1479 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); 1480 System.exit(res); 1481 } 1482}