001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.concurrent.ThreadLocalRandom;
022import org.apache.commons.math3.random.RandomData;
023import org.apache.commons.math3.random.RandomDataImpl;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
028import org.apache.hadoop.hbase.io.crypto.CryptoCipherProvider;
029import org.apache.hadoop.hbase.io.crypto.DefaultCipherProvider;
030import org.apache.hadoop.hbase.io.crypto.Encryption;
031import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
032import org.apache.hadoop.hbase.io.crypto.aes.AES;
033import org.apache.hadoop.hbase.io.hfile.CacheConfig;
034import org.apache.hadoop.hbase.io.hfile.HFile;
035import org.apache.hadoop.hbase.io.hfile.HFileContext;
036import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
037import org.apache.hadoop.hbase.io.hfile.HFileScanner;
038import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * This class runs performance benchmarks for {@link HFile}.
047 */
048@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
049public class HFilePerformanceEvaluation {
050  private static final int ROW_LENGTH = 10;
051  private static final int ROW_COUNT = 1000000;
052  private static final int RFILE_BLOCKSIZE = 8 * 1024;
053  private static StringBuilder testSummary = new StringBuilder();
054
055  // Disable verbose INFO logging from org.apache.hadoop.io.compress.CodecPool
056  static {
057    System.setProperty("org.apache.commons.logging.Log",
058      "org.apache.commons.logging.impl.SimpleLog");
059    System.setProperty(
060      "org.apache.commons.logging.simplelog.log.org.apache.hadoop.io.compress.CodecPool", "WARN");
061  }
062
063  private static final Logger LOG =
064    LoggerFactory.getLogger(HFilePerformanceEvaluation.class.getName());
065
066  static byte[] format(final int i) {
067    String v = Integer.toString(i);
068    return Bytes.toBytes("0000000000".substring(v.length()) + v);
069  }
070
071  static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
072    w.set(format(i));
073    return w;
074  }
075
076  static Cell createCell(final int i) {
077    return createCell(i, HConstants.EMPTY_BYTE_ARRAY);
078  }
079
080  /**
081   * HFile is Cell-based. It used to be byte arrays. Doing this test, pass Cells. All Cells
082   * intentionally have same coordinates in all fields but row.
083   * @param i     Integer to format as a row Key.
084   * @param value Value to use
085   * @return Created Cell.
086   */
087  static Cell createCell(final int i, final byte[] value) {
088    return createCell(format(i), value);
089  }
090
091  static Cell createCell(final byte[] keyRow) {
092    return CellUtil.createCell(keyRow);
093  }
094
095  static Cell createCell(final byte[] keyRow, final byte[] value) {
096    return CellUtil.createCell(keyRow, value);
097  }
098
099  /**
100   * Add any supported codec or cipher to test the HFile read/write performance. Specify "none" to
101   * disable codec or cipher or both.
102   */
103  private void runBenchmarks() throws Exception {
104    final Configuration conf = new Configuration();
105    final FileSystem fs = FileSystem.get(conf);
106    final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
107
108    // codec=none cipher=none
109    runWriteBenchmark(conf, fs, mf, "none", "none");
110    runReadBenchmark(conf, fs, mf, "none", "none");
111
112    // codec=gz cipher=none
113    runWriteBenchmark(conf, fs, mf, "gz", "none");
114    runReadBenchmark(conf, fs, mf, "gz", "none");
115
116    // Add configuration for AES cipher
117    final Configuration aesconf = new Configuration();
118    aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
119    aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
120    aesconf.setInt("hfile.format.version", 3);
121    final FileSystem aesfs = FileSystem.get(aesconf);
122    final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
123
124    // codec=none cipher=aes
125    runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
126    runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
127
128    // codec=gz cipher=aes
129    runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
130    runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
131
132    // Add configuration for Commons cipher
133    final Configuration cryptoconf = new Configuration();
134    cryptoconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
135    cryptoconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
136    cryptoconf.setInt("hfile.format.version", 3);
137    cryptoconf.set(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY, CryptoCipherProvider.class.getName());
138    final FileSystem cryptofs = FileSystem.get(cryptoconf);
139    final Path cryptof = cryptofs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
140
141    // codec=none cipher=aes
142    runWriteBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
143    runReadBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
144
145    // codec=gz cipher=aes
146    runWriteBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
147    runReadBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
148
149    // cleanup test files
150    if (fs.exists(mf)) {
151      fs.delete(mf, true);
152    }
153    if (aesfs.exists(aesmf)) {
154      aesfs.delete(aesmf, true);
155    }
156    if (cryptofs.exists(aesmf)) {
157      cryptofs.delete(cryptof, true);
158    }
159
160    // Print Result Summary
161    LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
162    LOG.info(testSummary.toString());
163
164  }
165
166  /**
167   * Write a test HFile with the given codec & cipher
168   * @param codec  "none", "lzo", "gz", "snappy"
169   * @param cipher "none", "aes"
170   */
171  private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
172    String cipher) throws Exception {
173    if (fs.exists(mf)) {
174      fs.delete(mf, true);
175    }
176
177    runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher), ROW_COUNT,
178      codec, getCipherName(conf, cipher));
179
180  }
181
182  /**
183   * Run all the read benchmarks for the test HFile
184   * @param codec  "none", "lzo", "gz", "snappy"
185   * @param cipher "none", "aes"
186   */
187  private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
188    final String codec, final String cipher) {
189    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
190      @Override
191      public void run() {
192        try {
193          runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
194            getCipherName(conf, cipher));
195        } catch (Exception e) {
196          testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
197          e.printStackTrace();
198        }
199      }
200    });
201
202    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
203      @Override
204      public void run() {
205        try {
206          runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
207            getCipherName(conf, cipher));
208        } catch (Exception e) {
209          testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
210          e.printStackTrace();
211        }
212      }
213    });
214
215    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
216      @Override
217      public void run() {
218        try {
219          runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
220            getCipherName(conf, cipher));
221        } catch (Exception e) {
222          testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
223          e.printStackTrace();
224        }
225      }
226    });
227
228    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
229      @Override
230      public void run() {
231        try {
232          runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
233            getCipherName(conf, cipher));
234        } catch (Exception e) {
235          testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
236          e.printStackTrace();
237        }
238      }
239    });
240
241  }
242
243  protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount, String codec,
244    String cipher) throws Exception {
245    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + codec + "] "
246      + "cipher[" + cipher + "] for " + rowCount + " rows.");
247
248    long elapsedTime = benchmark.run();
249
250    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + codec + "] "
251      + "cipher[" + cipher + "] for " + rowCount + " rows took " + elapsedTime + "ms.");
252
253    // Store results to print summary at the end
254    testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
255      .append(" with codec[").append(codec).append("] cipher[").append(cipher).append("] for ")
256      .append(rowCount).append(" rows took ").append(elapsedTime).append("ms.").append("\n");
257  }
258
259  static abstract class RowOrientedBenchmark {
260
261    protected final Configuration conf;
262    protected final FileSystem fs;
263    protected final Path mf;
264    protected final int totalRows;
265    protected String codec = "none";
266    protected String cipher = "none";
267
268    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows,
269      String codec, String cipher) {
270      this.conf = conf;
271      this.fs = fs;
272      this.mf = mf;
273      this.totalRows = totalRows;
274      this.codec = codec;
275      this.cipher = cipher;
276    }
277
278    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
279      this.conf = conf;
280      this.fs = fs;
281      this.mf = mf;
282      this.totalRows = totalRows;
283    }
284
285    void setUp() throws Exception {
286      // do nothing
287    }
288
289    abstract void doRow(int i) throws Exception;
290
291    protected int getReportingPeriod() {
292      return this.totalRows / 10;
293    }
294
295    void tearDown() throws Exception {
296      // do nothing
297    }
298
299    /**
300     * Run benchmark
301     * @return elapsed time.
302     */
303    long run() throws Exception {
304      long elapsedTime;
305      setUp();
306      long startTime = EnvironmentEdgeManager.currentTime();
307      try {
308        for (int i = 0; i < totalRows; i++) {
309          if (i > 0 && i % getReportingPeriod() == 0) {
310            LOG.info("Processed " + i + " rows.");
311          }
312          doRow(i);
313        }
314        elapsedTime = EnvironmentEdgeManager.currentTime() - startTime;
315      } finally {
316        tearDown();
317      }
318      return elapsedTime;
319    }
320
321  }
322
323  static class SequentialWriteBenchmark extends RowOrientedBenchmark {
324    protected HFile.Writer writer;
325    private byte[] bytes = new byte[ROW_LENGTH];
326
327    public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows,
328      String codec, String cipher) {
329      super(conf, fs, mf, totalRows, codec, cipher);
330    }
331
332    @Override
333    void setUp() throws Exception {
334
335      HFileContextBuilder builder = new HFileContextBuilder()
336        .withCompression(HFileWriterImpl.compressionByName(codec)).withBlockSize(RFILE_BLOCKSIZE);
337
338      if (cipher == "aes") {
339        byte[] cipherKey = new byte[AES.KEY_LENGTH];
340        Bytes.secureRandom(cipherKey);
341        builder.withEncryptionContext(Encryption.newContext(conf)
342          .setCipher(Encryption.getCipher(conf, cipher)).setKey(cipherKey));
343      } else if (!"none".equals(cipher)) {
344        throw new IOException("Cipher " + cipher + " not supported.");
345      }
346
347      HFileContext hFileContext = builder.build();
348
349      writer =
350        HFile.getWriterFactoryNoCache(conf).withPath(fs, mf).withFileContext(hFileContext).create();
351    }
352
353    @Override
354    void doRow(int i) throws Exception {
355      writer.append(createCell(i, generateValue()));
356    }
357
358    private byte[] generateValue() {
359      Bytes.random(bytes);
360      return bytes;
361    }
362
363    @Override
364    protected int getReportingPeriod() {
365      return this.totalRows; // don't report progress
366    }
367
368    @Override
369    void tearDown() throws Exception {
370      writer.close();
371    }
372
373  }
374
375  static abstract class ReadBenchmark extends RowOrientedBenchmark {
376
377    protected HFile.Reader reader;
378
379    public ReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
380      super(conf, fs, mf, totalRows);
381    }
382
383    @Override
384    void setUp() throws Exception {
385      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
386    }
387
388    @Override
389    void tearDown() throws Exception {
390      reader.close();
391    }
392
393  }
394
395  static class SequentialReadBenchmark extends ReadBenchmark {
396    private HFileScanner scanner;
397
398    public SequentialReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
399      super(conf, fs, mf, totalRows);
400    }
401
402    @Override
403    void setUp() throws Exception {
404      super.setUp();
405      this.scanner = this.reader.getScanner(conf, false, false);
406      this.scanner.seekTo();
407    }
408
409    @Override
410    void doRow(int i) throws Exception {
411      if (this.scanner.next()) {
412        // TODO: Fix. Make Scanner do Cells.
413        Cell c = this.scanner.getCell();
414        PerformanceEvaluationCommons.assertKey(format(i + 1), c);
415        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
416      }
417    }
418
419    @Override
420    protected int getReportingPeriod() {
421      return this.totalRows; // don't report progress
422    }
423
424  }
425
426  static class UniformRandomReadBenchmark extends ReadBenchmark {
427
428    public UniformRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
429      super(conf, fs, mf, totalRows);
430    }
431
432    @Override
433    void doRow(int i) throws Exception {
434      HFileScanner scanner = this.reader.getScanner(conf, false, true);
435      byte[] b = getRandomRow();
436      if (scanner.seekTo(createCell(b)) < 0) {
437        LOG.info("Not able to seekTo " + new String(b));
438        return;
439      }
440      // TODO: Fix scanner so it does Cells
441      Cell c = scanner.getCell();
442      PerformanceEvaluationCommons.assertKey(b, c);
443      PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
444    }
445
446    private byte[] getRandomRow() {
447      return format(ThreadLocalRandom.current().nextInt(totalRows));
448    }
449  }
450
451  static class UniformRandomSmallScan extends ReadBenchmark {
452
453    public UniformRandomSmallScan(Configuration conf, FileSystem fs, Path mf, int totalRows) {
454      super(conf, fs, mf, totalRows / 10);
455    }
456
457    @Override
458    void doRow(int i) throws Exception {
459      HFileScanner scanner = this.reader.getScanner(conf, false, false);
460      byte[] b = getRandomRow();
461      // System.out.println("Random row: " + new String(b));
462      Cell c = createCell(b);
463      if (scanner.seekTo(c) != 0) {
464        LOG.info("Nonexistent row: " + new String(b));
465        return;
466      }
467      // TODO: HFileScanner doesn't do Cells yet. Temporary fix.
468      c = scanner.getCell();
469      // System.out.println("Found row: " +
470      // new String(c.getRowArray(), c.getRowOffset(), c.getRowLength()));
471      PerformanceEvaluationCommons.assertKey(b, c);
472      for (int ii = 0; ii < 30; ii++) {
473        if (!scanner.next()) {
474          LOG.info("NOTHING FOLLOWS");
475          return;
476        }
477        c = scanner.getCell();
478        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
479      }
480    }
481
482    private byte[] getRandomRow() {
483      return format(ThreadLocalRandom.current().nextInt(totalRows));
484    }
485  }
486
487  static class GaussianRandomReadBenchmark extends ReadBenchmark {
488
489    private RandomData randomData = new RandomDataImpl();
490
491    public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
492      super(conf, fs, mf, totalRows);
493    }
494
495    @Override
496    void doRow(int i) throws Exception {
497      HFileScanner scanner = this.reader.getScanner(conf, false, true);
498      byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
499      scanner.seekTo(createCell(gaussianRandomRowBytes));
500      for (int ii = 0; ii < 30; ii++) {
501        if (!scanner.next()) {
502          LOG.info("NOTHING FOLLOWS");
503          return;
504        }
505        // TODO: Fix. Make scanner do Cells.
506        scanner.getCell();
507      }
508    }
509
510    private byte[] getGaussianRandomRowBytes() {
511      int r = (int) randomData.nextGaussian((double) totalRows / 2.0, (double) totalRows / 10.0);
512      // make sure r falls into [0,totalRows)
513      return format(Math.min(totalRows, Math.max(r, 0)));
514    }
515  }
516
517  /**
518   *   */
519  public static void main(String[] args) throws Exception {
520    new HFilePerformanceEvaluation().runBenchmarks();
521  }
522
523  private String getCipherName(Configuration conf, String cipherName) {
524    if (cipherName.equals("aes")) {
525      String provider = conf.get(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY);
526      if (
527        provider == null || provider.equals("")
528          || provider.equals(DefaultCipherProvider.class.getName())
529      ) {
530        return "aes-default";
531      } else if (provider.equals(CryptoCipherProvider.class.getName())) {
532        return "aes-commons";
533      }
534    }
535    return cipherName;
536  }
537}