001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.concurrent.ThreadLocalRandom;
022import org.apache.commons.math3.random.RandomData;
023import org.apache.commons.math3.random.RandomDataImpl;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
028import org.apache.hadoop.hbase.io.crypto.CryptoCipherProvider;
029import org.apache.hadoop.hbase.io.crypto.DefaultCipherProvider;
030import org.apache.hadoop.hbase.io.crypto.Encryption;
031import org.apache.hadoop.hbase.io.crypto.MockAesKeyProvider;
032import org.apache.hadoop.hbase.io.crypto.aes.AES;
033import org.apache.hadoop.hbase.io.hfile.CacheConfig;
034import org.apache.hadoop.hbase.io.hfile.HFile;
035import org.apache.hadoop.hbase.io.hfile.HFileContext;
036import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
037import org.apache.hadoop.hbase.io.hfile.HFileScanner;
038import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * This class runs performance benchmarks for {@link HFile}.
047 */
048@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
049public class HFilePerformanceEvaluation {
050  private static final int ROW_LENGTH = 10;
051  private static final int ROW_COUNT = 1000000;
052  private static final int RFILE_BLOCKSIZE = 8 * 1024;
053  private static StringBuilder testSummary = new StringBuilder();
054
055  // Disable verbose INFO logging from org.apache.hadoop.hbase.io.compress.CodecPool
056  static {
057    System.setProperty("org.apache.commons.logging.Log",
058      "org.apache.commons.logging.impl.SimpleLog");
059    System.setProperty(
060      "org.apache.commons.logging.simplelog.log.org.apache.hadoop.hbase.io.compress.CodecPool",
061      "WARN");
062  }
063
064  private static final Logger LOG =
065    LoggerFactory.getLogger(HFilePerformanceEvaluation.class.getName());
066
067  static byte[] format(final int i) {
068    String v = Integer.toString(i);
069    return Bytes.toBytes("0000000000".substring(v.length()) + v);
070  }
071
072  static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
073    w.set(format(i));
074    return w;
075  }
076
077  static ExtendedCell createCell(final int i) {
078    return createCell(i, HConstants.EMPTY_BYTE_ARRAY);
079  }
080
081  /**
082   * HFile is Cell-based. It used to be byte arrays. Doing this test, pass Cells. All Cells
083   * intentionally have same coordinates in all fields but row.
084   * @param i     Integer to format as a row Key.
085   * @param value Value to use
086   * @return Created Cell.
087   */
088  static ExtendedCell createCell(final int i, final byte[] value) {
089    return createCell(format(i), value);
090  }
091
092  static ExtendedCell createCell(final byte[] keyRow) {
093    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(keyRow)
094      .setFamily(HConstants.EMPTY_BYTE_ARRAY).setQualifier(HConstants.EMPTY_BYTE_ARRAY)
095      .setTimestamp(HConstants.LATEST_TIMESTAMP).setType(KeyValue.Type.Maximum.getCode())
096      .setValue(HConstants.EMPTY_BYTE_ARRAY).build();
097  }
098
099  static ExtendedCell createCell(final byte[] keyRow, final byte[] value) {
100    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(keyRow)
101      .setFamily(HConstants.EMPTY_BYTE_ARRAY).setQualifier(HConstants.EMPTY_BYTE_ARRAY)
102      .setTimestamp(HConstants.LATEST_TIMESTAMP).setType(KeyValue.Type.Maximum.getCode())
103      .setValue(value).build();
104  }
105
106  /**
107   * Add any supported codec or cipher to test the HFile read/write performance. Specify "none" to
108   * disable codec or cipher or both.
109   */
110  private void runBenchmarks() throws Exception {
111    final Configuration conf = new Configuration();
112    final FileSystem fs = FileSystem.get(conf);
113    final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
114
115    // codec=none cipher=none
116    runWriteBenchmark(conf, fs, mf, "none", "none");
117    runReadBenchmark(conf, fs, mf, "none", "none");
118
119    // codec=gz cipher=none
120    runWriteBenchmark(conf, fs, mf, "gz", "none");
121    runReadBenchmark(conf, fs, mf, "gz", "none");
122
123    // Add configuration for AES cipher
124    final Configuration aesconf = new Configuration();
125    aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, MockAesKeyProvider.class.getName());
126    aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
127    aesconf.setInt("hfile.format.version", 3);
128    final FileSystem aesfs = FileSystem.get(aesconf);
129    final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
130
131    // codec=none cipher=aes
132    runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
133    runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
134
135    // codec=gz cipher=aes
136    runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
137    runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
138
139    // Add configuration for Commons cipher
140    final Configuration cryptoconf = new Configuration();
141    cryptoconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, MockAesKeyProvider.class.getName());
142    cryptoconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
143    cryptoconf.setInt("hfile.format.version", 3);
144    cryptoconf.set(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY, CryptoCipherProvider.class.getName());
145    final FileSystem cryptofs = FileSystem.get(cryptoconf);
146    final Path cryptof = cryptofs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
147
148    // codec=none cipher=aes
149    runWriteBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
150    runReadBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
151
152    // codec=gz cipher=aes
153    runWriteBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
154    runReadBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
155
156    // cleanup test files
157    if (fs.exists(mf)) {
158      fs.delete(mf, true);
159    }
160    if (aesfs.exists(aesmf)) {
161      aesfs.delete(aesmf, true);
162    }
163    if (cryptofs.exists(aesmf)) {
164      cryptofs.delete(cryptof, true);
165    }
166
167    // Print Result Summary
168    LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
169    LOG.info(testSummary.toString());
170
171  }
172
173  /**
174   * Write a test HFile with the given codec & cipher
175   * @param codec  "none", "lzo", "gz", "snappy"
176   * @param cipher "none", "aes"
177   */
178  private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
179    String cipher) throws Exception {
180    if (fs.exists(mf)) {
181      fs.delete(mf, true);
182    }
183
184    runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher), ROW_COUNT,
185      codec, getCipherName(conf, cipher));
186
187  }
188
189  /**
190   * Run all the read benchmarks for the test HFile
191   * @param codec  "none", "lzo", "gz", "snappy"
192   * @param cipher "none", "aes"
193   */
194  private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
195    final String codec, final String cipher) {
196    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
197      @Override
198      public void run() {
199        try {
200          runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
201            getCipherName(conf, cipher));
202        } catch (Exception e) {
203          testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
204          e.printStackTrace();
205        }
206      }
207    });
208
209    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
210      @Override
211      public void run() {
212        try {
213          runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
214            getCipherName(conf, cipher));
215        } catch (Exception e) {
216          testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
217          e.printStackTrace();
218        }
219      }
220    });
221
222    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
223      @Override
224      public void run() {
225        try {
226          runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
227            getCipherName(conf, cipher));
228        } catch (Exception e) {
229          testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
230          e.printStackTrace();
231        }
232      }
233    });
234
235    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
236      @Override
237      public void run() {
238        try {
239          runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
240            getCipherName(conf, cipher));
241        } catch (Exception e) {
242          testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
243          e.printStackTrace();
244        }
245      }
246    });
247
248  }
249
250  protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount, String codec,
251    String cipher) throws Exception {
252    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + codec + "] "
253      + "cipher[" + cipher + "] for " + rowCount + " rows.");
254
255    long elapsedTime = benchmark.run();
256
257    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + codec + "] "
258      + "cipher[" + cipher + "] for " + rowCount + " rows took " + elapsedTime + "ms.");
259
260    // Store results to print summary at the end
261    testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
262      .append(" with codec[").append(codec).append("] cipher[").append(cipher).append("] for ")
263      .append(rowCount).append(" rows took ").append(elapsedTime).append("ms.").append("\n");
264  }
265
266  static abstract class RowOrientedBenchmark {
267
268    protected final Configuration conf;
269    protected final FileSystem fs;
270    protected final Path mf;
271    protected final int totalRows;
272    protected String codec = "none";
273    protected String cipher = "none";
274
275    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows,
276      String codec, String cipher) {
277      this.conf = conf;
278      this.fs = fs;
279      this.mf = mf;
280      this.totalRows = totalRows;
281      this.codec = codec;
282      this.cipher = cipher;
283    }
284
285    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
286      this.conf = conf;
287      this.fs = fs;
288      this.mf = mf;
289      this.totalRows = totalRows;
290    }
291
292    void setUp() throws Exception {
293      // do nothing
294    }
295
296    abstract void doRow(int i) throws Exception;
297
298    protected int getReportingPeriod() {
299      return this.totalRows / 10;
300    }
301
302    void tearDown() throws Exception {
303      // do nothing
304    }
305
306    /**
307     * Run benchmark
308     * @return elapsed time.
309     */
310    long run() throws Exception {
311      long elapsedTime;
312      setUp();
313      long startTime = EnvironmentEdgeManager.currentTime();
314      try {
315        for (int i = 0; i < totalRows; i++) {
316          if (i > 0 && i % getReportingPeriod() == 0) {
317            LOG.info("Processed " + i + " rows.");
318          }
319          doRow(i);
320        }
321        elapsedTime = EnvironmentEdgeManager.currentTime() - startTime;
322      } finally {
323        tearDown();
324      }
325      return elapsedTime;
326    }
327
328  }
329
330  static class SequentialWriteBenchmark extends RowOrientedBenchmark {
331    protected HFile.Writer writer;
332    private byte[] bytes = new byte[ROW_LENGTH];
333
334    public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows,
335      String codec, String cipher) {
336      super(conf, fs, mf, totalRows, codec, cipher);
337    }
338
339    @Override
340    void setUp() throws Exception {
341
342      HFileContextBuilder builder = new HFileContextBuilder()
343        .withCompression(HFileWriterImpl.compressionByName(codec)).withBlockSize(RFILE_BLOCKSIZE);
344
345      if (cipher == "aes") {
346        byte[] cipherKey = new byte[AES.KEY_LENGTH];
347        Bytes.secureRandom(cipherKey);
348        builder.withEncryptionContext(Encryption.newContext(conf)
349          .setCipher(Encryption.getCipher(conf, cipher)).setKey(cipherKey));
350      } else if (!"none".equals(cipher)) {
351        throw new IOException("Cipher " + cipher + " not supported.");
352      }
353
354      HFileContext hFileContext = builder.build();
355
356      writer =
357        HFile.getWriterFactoryNoCache(conf).withPath(fs, mf).withFileContext(hFileContext).create();
358    }
359
360    @Override
361    void doRow(int i) throws Exception {
362      writer.append(createCell(i, generateValue()));
363    }
364
365    private byte[] generateValue() {
366      Bytes.random(bytes);
367      return bytes;
368    }
369
370    @Override
371    protected int getReportingPeriod() {
372      return this.totalRows; // don't report progress
373    }
374
375    @Override
376    void tearDown() throws Exception {
377      writer.close();
378    }
379
380  }
381
382  static abstract class ReadBenchmark extends RowOrientedBenchmark {
383
384    protected HFile.Reader reader;
385
386    public ReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
387      super(conf, fs, mf, totalRows);
388    }
389
390    @Override
391    void setUp() throws Exception {
392      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
393    }
394
395    @Override
396    void tearDown() throws Exception {
397      reader.close();
398    }
399
400  }
401
402  static class SequentialReadBenchmark extends ReadBenchmark {
403    private HFileScanner scanner;
404
405    public SequentialReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
406      super(conf, fs, mf, totalRows);
407    }
408
409    @Override
410    void setUp() throws Exception {
411      super.setUp();
412      this.scanner = this.reader.getScanner(conf, false, false);
413      this.scanner.seekTo();
414    }
415
416    @Override
417    void doRow(int i) throws Exception {
418      if (this.scanner.next()) {
419        // TODO: Fix. Make Scanner do Cells.
420        Cell c = this.scanner.getCell();
421        PerformanceEvaluationCommons.assertKey(format(i + 1), c);
422        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
423      }
424    }
425
426    @Override
427    protected int getReportingPeriod() {
428      return this.totalRows; // don't report progress
429    }
430
431  }
432
433  static class UniformRandomReadBenchmark extends ReadBenchmark {
434
435    public UniformRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
436      super(conf, fs, mf, totalRows);
437    }
438
439    @Override
440    void doRow(int i) throws Exception {
441      HFileScanner scanner = this.reader.getScanner(conf, false, true);
442      byte[] b = getRandomRow();
443      if (scanner.seekTo(createCell(b)) < 0) {
444        LOG.info("Not able to seekTo " + new String(b));
445        return;
446      }
447      // TODO: Fix scanner so it does Cells
448      Cell c = scanner.getCell();
449      PerformanceEvaluationCommons.assertKey(b, c);
450      PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
451    }
452
453    private byte[] getRandomRow() {
454      return format(ThreadLocalRandom.current().nextInt(totalRows));
455    }
456  }
457
458  static class UniformRandomSmallScan extends ReadBenchmark {
459
460    public UniformRandomSmallScan(Configuration conf, FileSystem fs, Path mf, int totalRows) {
461      super(conf, fs, mf, totalRows / 10);
462    }
463
464    @Override
465    void doRow(int i) throws Exception {
466      HFileScanner scanner = this.reader.getScanner(conf, false, false);
467      byte[] b = getRandomRow();
468      // System.out.println("Random row: " + new String(b));
469      ExtendedCell c = createCell(b);
470      if (scanner.seekTo(c) != 0) {
471        LOG.info("Nonexistent row: " + new String(b));
472        return;
473      }
474      // TODO: HFileScanner doesn't do Cells yet. Temporary fix.
475      c = scanner.getCell();
476      // System.out.println("Found row: " +
477      // new String(c.getRowArray(), c.getRowOffset(), c.getRowLength()));
478      PerformanceEvaluationCommons.assertKey(b, c);
479      for (int ii = 0; ii < 30; ii++) {
480        if (!scanner.next()) {
481          LOG.info("NOTHING FOLLOWS");
482          return;
483        }
484        c = scanner.getCell();
485        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
486      }
487    }
488
489    private byte[] getRandomRow() {
490      return format(ThreadLocalRandom.current().nextInt(totalRows));
491    }
492  }
493
494  static class GaussianRandomReadBenchmark extends ReadBenchmark {
495
496    private RandomData randomData = new RandomDataImpl();
497
498    public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
499      super(conf, fs, mf, totalRows);
500    }
501
502    @Override
503    void doRow(int i) throws Exception {
504      HFileScanner scanner = this.reader.getScanner(conf, false, true);
505      byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
506      scanner.seekTo(createCell(gaussianRandomRowBytes));
507      for (int ii = 0; ii < 30; ii++) {
508        if (!scanner.next()) {
509          LOG.info("NOTHING FOLLOWS");
510          return;
511        }
512        // TODO: Fix. Make scanner do Cells.
513        scanner.getCell();
514      }
515    }
516
517    private byte[] getGaussianRandomRowBytes() {
518      int r = (int) randomData.nextGaussian((double) totalRows / 2.0, (double) totalRows / 10.0);
519      // make sure r falls into [0,totalRows)
520      return format(Math.min(totalRows, Math.max(r, 0)));
521    }
522  }
523
524  /**
525   *   */
526  public static void main(String[] args) throws Exception {
527    new HFilePerformanceEvaluation().runBenchmarks();
528  }
529
530  private String getCipherName(Configuration conf, String cipherName) {
531    if (cipherName.equals("aes")) {
532      String provider = conf.get(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY);
533      if (
534        provider == null || provider.equals("")
535          || provider.equals(DefaultCipherProvider.class.getName())
536      ) {
537        return "aes-default";
538      } else if (provider.equals(CryptoCipherProvider.class.getName())) {
539        return "aes-commons";
540      }
541    }
542    return cipherName;
543  }
544}