001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.BufferedInputStream;
022import java.io.BufferedOutputStream;
023import java.io.Closeable;
024import java.io.FilterOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import org.apache.hadoop.conf.Configurable;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.io.HeapSize;
032import org.apache.hadoop.hbase.nio.ByteBuff;
033import org.apache.hadoop.io.compress.CompressionCodec;
034import org.apache.hadoop.io.compress.CompressionInputStream;
035import org.apache.hadoop.io.compress.CompressionOutputStream;
036import org.apache.hadoop.io.compress.Compressor;
037import org.apache.hadoop.io.compress.Decompressor;
038import org.apache.hadoop.io.compress.DoNotPool;
039import org.apache.hadoop.util.ReflectionUtils;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Compression related stuff. Copied from hadoop-3315 tfile.
046 */
047@InterfaceAudience.Private
048public final class Compression {
049  private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
050
051  // LZO
052
053  public static final String LZO_CODEC_CLASS_KEY = "hbase.io.compress.lzo.codec";
054  public static final String LZO_CODEC_CLASS_DEFAULT = "com.hadoop.compression.lzo.LzoCodec";
055
056  // GZ
057
058  public static final String GZ_CODEC_CLASS_KEY = "hbase.io.compress.gz.codec";
059  // Our ReusableStreamGzipCodec fixes an inefficiency in Hadoop's Gzip codec, allowing us to
060  // reuse compression streams, but still requires the Hadoop native codec.
061  public static final String GZ_CODEC_CLASS_DEFAULT =
062    "org.apache.hadoop.hbase.io.compress.ReusableStreamGzipCodec";
063
064  // SNAPPY
065
066  public static final String SNAPPY_CODEC_CLASS_KEY = "hbase.io.compress.snappy.codec";
067  public static final String SNAPPY_CODEC_CLASS_DEFAULT =
068    "org.apache.hadoop.io.compress.SnappyCodec";
069
070  // LZ4
071
072  public static final String LZ4_CODEC_CLASS_KEY = "hbase.io.compress.lz4.codec";
073  public static final String LZ4_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.Lz4Codec";
074
075  // ZSTD
076
077  public static final String ZSTD_CODEC_CLASS_KEY = "hbase.io.compress.zstd.codec";
078  public static final String ZSTD_CODEC_CLASS_DEFAULT =
079    "org.apache.hadoop.io.compress.ZStandardCodec";
080
081  // BZIP2
082
083  public static final String BZIP2_CODEC_CLASS_KEY = "hbase.io.compress.bzip2.codec";
084  public static final String BZIP2_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.BZip2Codec";
085
086  // LZMA
087
088  public static final String LZMA_CODEC_CLASS_KEY = "hbase.io.compress.lzma.codec";
089  public static final String LZMA_CODEC_CLASS_DEFAULT =
090    "org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
091
092  // Brotli
093
094  public static final String BROTLI_CODEC_CLASS_KEY = "hbase.io.compress.brotli.codec";
095  public static final String BROTLI_CODEC_CLASS_DEFAULT =
096    "org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";
097
098  /**
099   * Prevent the instantiation of class.
100   */
101  private Compression() {
102    super();
103  }
104
105  static class FinishOnFlushCompressionStream extends FilterOutputStream {
106    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
107      super(cout);
108    }
109
110    @Override
111    public void write(byte b[], int off, int len) throws IOException {
112      out.write(b, off, len);
113    }
114
115    @Override
116    public void flush() throws IOException {
117      CompressionOutputStream cout = (CompressionOutputStream) out;
118      cout.finish();
119      cout.flush();
120      cout.resetState();
121    }
122  }
123
124  /**
125   * Returns the classloader to load the Codec class from.
126   */
127  private static ClassLoader getClassLoaderForCodec() {
128    ClassLoader cl = Thread.currentThread().getContextClassLoader();
129    if (cl == null) {
130      cl = Compression.class.getClassLoader();
131    }
132    if (cl == null) {
133      cl = ClassLoader.getSystemClassLoader();
134    }
135    if (cl == null) {
136      throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
137    }
138    return cl;
139  }
140
141  /**
142   * Compression algorithms. The ordinal of these cannot change or else you risk breaking all
143   * existing HFiles out there. Even the ones that are not compressed! (They use the NONE algorithm)
144   */
145  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED",
146      justification = "We are not serializing so doesn't apply (not sure why transient though)")
147  @SuppressWarnings("ImmutableEnumChecker")
148  @InterfaceAudience.Public
149  public static enum Algorithm {
150    // LZO is GPL and requires extra install to setup. See
151    // https://stackoverflow.com/questions/23441142/class-com-hadoop-compression-lzo-lzocodec-not-found-for-spark-on-cdh-5
152    LZO("lzo", LZO_CODEC_CLASS_KEY, LZO_CODEC_CLASS_DEFAULT) {
153      // Use base type to avoid compile-time dependencies.
154      private volatile transient CompressionCodec lzoCodec;
155      private final transient Object lock = new Object();
156
157      @Override
158      CompressionCodec getCodec(Configuration conf) {
159        if (lzoCodec == null) {
160          synchronized (lock) {
161            if (lzoCodec == null) {
162              lzoCodec = buildCodec(conf, this);
163            }
164          }
165        }
166        return lzoCodec;
167      }
168
169      @Override
170      public CompressionCodec reload(Configuration conf) {
171        synchronized (lock) {
172          lzoCodec = buildCodec(conf, this);
173          LOG.warn("Reloaded configuration for {}", name());
174          return lzoCodec;
175        }
176      }
177    },
178
179    GZ("gz", GZ_CODEC_CLASS_KEY, GZ_CODEC_CLASS_DEFAULT) {
180      private volatile transient CompressionCodec gzCodec;
181      private final transient Object lock = new Object();
182
183      @Override
184      CompressionCodec getCodec(Configuration conf) {
185        if (gzCodec == null) {
186          synchronized (lock) {
187            if (gzCodec == null) {
188              gzCodec = buildCodec(conf, this);
189            }
190          }
191        }
192        return gzCodec;
193      }
194
195      @Override
196      public CompressionCodec reload(Configuration conf) {
197        synchronized (lock) {
198          gzCodec = buildCodec(conf, this);
199          LOG.warn("Reloaded configuration for {}", name());
200          return gzCodec;
201        }
202      }
203    },
204
205    NONE("none", "", "") {
206      @Override
207      CompressionCodec getCodec(Configuration conf) {
208        return null;
209      }
210
211      @Override
212      public CompressionCodec reload(Configuration conf) {
213        return null;
214      }
215
216      @Override
217      public synchronized InputStream createDecompressionStream(InputStream downStream,
218        Decompressor decompressor, int downStreamBufferSize) throws IOException {
219        if (downStreamBufferSize > 0) {
220          return new BufferedInputStream(downStream, downStreamBufferSize);
221        }
222        return downStream;
223      }
224
225      @Override
226      public synchronized OutputStream createCompressionStream(OutputStream downStream,
227        Compressor compressor, int downStreamBufferSize) throws IOException {
228        if (downStreamBufferSize > 0) {
229          return new BufferedOutputStream(downStream, downStreamBufferSize);
230        }
231
232        return downStream;
233      }
234    },
235    SNAPPY("snappy", SNAPPY_CODEC_CLASS_KEY, SNAPPY_CODEC_CLASS_DEFAULT) {
236      // Use base type to avoid compile-time dependencies.
237      private volatile transient CompressionCodec snappyCodec;
238      private final transient Object lock = new Object();
239
240      @Override
241      CompressionCodec getCodec(Configuration conf) {
242        if (snappyCodec == null) {
243          synchronized (lock) {
244            if (snappyCodec == null) {
245              snappyCodec = buildCodec(conf, this);
246            }
247          }
248        }
249        return snappyCodec;
250      }
251
252      @Override
253      public CompressionCodec reload(Configuration conf) {
254        synchronized (lock) {
255          snappyCodec = buildCodec(conf, this);
256          LOG.warn("Reloaded configuration for {}", name());
257          return snappyCodec;
258        }
259      }
260    },
261    LZ4("lz4", LZ4_CODEC_CLASS_KEY, LZ4_CODEC_CLASS_DEFAULT) {
262      // Use base type to avoid compile-time dependencies.
263      private volatile transient CompressionCodec lz4Codec;
264      private final transient Object lock = new Object();
265
266      @Override
267      CompressionCodec getCodec(Configuration conf) {
268        if (lz4Codec == null) {
269          synchronized (lock) {
270            if (lz4Codec == null) {
271              lz4Codec = buildCodec(conf, this);
272            }
273          }
274        }
275        return lz4Codec;
276      }
277
278      @Override
279      public CompressionCodec reload(Configuration conf) {
280        synchronized (lock) {
281          lz4Codec = buildCodec(conf, this);
282          LOG.warn("Reloaded configuration for {}", name());
283          return lz4Codec;
284        }
285      }
286    },
287    BZIP2("bzip2", BZIP2_CODEC_CLASS_KEY, BZIP2_CODEC_CLASS_DEFAULT) {
288      // Use base type to avoid compile-time dependencies.
289      private volatile transient CompressionCodec bzipCodec;
290      private final transient Object lock = new Object();
291
292      @Override
293      CompressionCodec getCodec(Configuration conf) {
294        if (bzipCodec == null) {
295          synchronized (lock) {
296            if (bzipCodec == null) {
297              bzipCodec = buildCodec(conf, this);
298            }
299          }
300        }
301        return bzipCodec;
302      }
303
304      @Override
305      public CompressionCodec reload(Configuration conf) {
306        synchronized (lock) {
307          bzipCodec = buildCodec(conf, this);
308          LOG.warn("Reloaded configuration for {}", name());
309          return bzipCodec;
310        }
311      }
312    },
313    ZSTD("zstd", ZSTD_CODEC_CLASS_KEY, ZSTD_CODEC_CLASS_DEFAULT) {
314      // Use base type to avoid compile-time dependencies.
315      private volatile transient CompressionCodec zStandardCodec;
316      private final transient Object lock = new Object();
317
318      @Override
319      CompressionCodec getCodec(Configuration conf) {
320        if (zStandardCodec == null) {
321          synchronized (lock) {
322            if (zStandardCodec == null) {
323              zStandardCodec = buildCodec(conf, this);
324            }
325          }
326        }
327        return zStandardCodec;
328      }
329
330      @Override
331      public CompressionCodec reload(Configuration conf) {
332        synchronized (lock) {
333          zStandardCodec = buildCodec(conf, this);
334          LOG.warn("Reloaded configuration for {}", name());
335          return zStandardCodec;
336        }
337      }
338    },
339    LZMA("lzma", LZMA_CODEC_CLASS_KEY, LZMA_CODEC_CLASS_DEFAULT) {
340      // Use base type to avoid compile-time dependencies.
341      private volatile transient CompressionCodec lzmaCodec;
342      private final transient Object lock = new Object();
343
344      @Override
345      CompressionCodec getCodec(Configuration conf) {
346        if (lzmaCodec == null) {
347          synchronized (lock) {
348            if (lzmaCodec == null) {
349              lzmaCodec = buildCodec(conf, this);
350            }
351          }
352        }
353        return lzmaCodec;
354      }
355
356      @Override
357      public CompressionCodec reload(Configuration conf) {
358        synchronized (lock) {
359          lzmaCodec = buildCodec(conf, this);
360          LOG.warn("Reloaded configuration for {}", name());
361          return lzmaCodec;
362        }
363      }
364    },
365
366    BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
367      // Use base type to avoid compile-time dependencies.
368      private volatile transient CompressionCodec brotliCodec;
369      private final transient Object lock = new Object();
370
371      @Override
372      CompressionCodec getCodec(Configuration conf) {
373        if (brotliCodec == null) {
374          synchronized (lock) {
375            if (brotliCodec == null) {
376              brotliCodec = buildCodec(conf, this);
377            }
378          }
379        }
380        return brotliCodec;
381      }
382
383      @Override
384      public CompressionCodec reload(Configuration conf) {
385        synchronized (lock) {
386          brotliCodec = buildCodec(conf, this);
387          LOG.warn("Reloaded configuration for {}", name());
388          return brotliCodec;
389        }
390      }
391    };
392
393    private final Configuration conf;
394    private final String compressName;
395    private final String confKey;
396    private final String confDefault;
397    /** data input buffer size to absorb small reads from application. */
398    private static final int DATA_IBUF_SIZE = 1 * 1024;
399    /** data output buffer size to absorb small writes from application. */
400    private static final int DATA_OBUF_SIZE = 4 * 1024;
401
402    Algorithm(String name, String confKey, String confDefault) {
403      this.conf = HBaseConfiguration.create();
404      this.conf.setBoolean("io.native.lib.available", true);
405      this.compressName = name;
406      this.confKey = confKey;
407      this.confDefault = confDefault;
408    }
409
410    abstract CompressionCodec getCodec(Configuration conf);
411
412    /**
413     * Reload configuration for the given algorithm.
414     * <p>
415     * NOTE: Experts only. This can only be done safely during process startup, before the
416     * algorithm's codecs are in use. If the codec implementation is changed, the new implementation
417     * may not be fully compatible with what was loaded at static initialization time, leading to
418     * potential data corruption. Mostly used by unit tests.
419     * @param conf configuration
420     */
421    public abstract CompressionCodec reload(Configuration conf);
422
423    public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor,
424      int downStreamBufferSize) throws IOException {
425      CompressionCodec codec = getCodec(conf);
426      // Set the internal buffer size to read from down stream.
427      if (downStreamBufferSize > 0) {
428        ((Configurable) codec).getConf().setInt("io.file.buffer.size", downStreamBufferSize);
429      }
430      CompressionInputStream cis = codec.createInputStream(downStream, decompressor);
431      BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
432      return bis2;
433
434    }
435
436    public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
437      int downStreamBufferSize) throws IOException {
438      OutputStream bos1 = null;
439      if (downStreamBufferSize > 0) {
440        bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
441      } else {
442        bos1 = downStream;
443      }
444      CompressionOutputStream cos = createPlainCompressionStream(bos1, compressor);
445      BufferedOutputStream bos2 =
446        new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
447      return bos2;
448    }
449
450    /**
451     * Creates a compression stream without any additional wrapping into buffering streams.
452     */
453    public CompressionOutputStream createPlainCompressionStream(OutputStream downStream,
454      Compressor compressor) throws IOException {
455      CompressionCodec codec = getCodec(conf);
456      ((Configurable) codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
457      return codec.createOutputStream(downStream, compressor);
458    }
459
460    public Compressor getCompressor() {
461      CompressionCodec codec = getCodec(conf);
462      if (codec != null) {
463        Compressor compressor = CodecPool.getCompressor(codec);
464        if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
465        if (compressor != null) {
466          if (compressor.finished()) {
467            // Somebody returns the compressor to CodecPool but is still using it.
468            LOG.warn("Compressor obtained from CodecPool is already finished()");
469          }
470          compressor.reset();
471        }
472        return compressor;
473      }
474      return null;
475    }
476
477    public void returnCompressor(Compressor compressor) {
478      if (compressor != null) {
479        if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
480        CodecPool.returnCompressor(compressor);
481      }
482    }
483
484    public Decompressor getDecompressor() {
485      CompressionCodec codec = getCodec(conf);
486      if (codec != null) {
487        Decompressor decompressor = CodecPool.getDecompressor(codec);
488        if (LOG.isTraceEnabled())
489          LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
490        if (decompressor != null) {
491          if (decompressor.finished()) {
492            // Somebody returns the decompressor to CodecPool but is still using it.
493            LOG.warn("Decompressor {} obtained from CodecPool is already finished", decompressor);
494          }
495          decompressor.reset();
496        }
497        return decompressor;
498      }
499
500      return null;
501    }
502
503    public void returnDecompressor(Decompressor decompressor) {
504      if (decompressor != null) {
505        if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
506        CodecPool.returnDecompressor(decompressor);
507        if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
508          if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
509          decompressor.end();
510        }
511      }
512    }
513
514    /**
515     * Signals if this codec theoretically supports decompression on {@link ByteBuff}s. This can be
516     * faster than using a DecompressionStream. If this method returns true, you can call
517     * {@link #getByteBuffDecompressor()} to obtain a {@link ByteBuffDecompressor}. You must then
518     * also call {@link ByteBuffDecompressor#canDecompress(ByteBuff, ByteBuff)} before attempting
519     * decompression, to verify if that decompressor is capable of handling your particular input
520     * and output buffers.
521     */
522    public boolean supportsByteBuffDecompression() {
523      CompressionCodec codec = getCodec(conf);
524      return codec instanceof ByteBuffDecompressionCodec;
525    }
526
527    /**
528     * Be sure to call {@link #supportsByteBuffDecompression()} before calling this method.
529     * @throws IllegalStateException if the codec does not support block decompression
530     */
531    public ByteBuffDecompressor getByteBuffDecompressor() {
532      CompressionCodec codec = getCodec(conf);
533      if (codec instanceof ByteBuffDecompressionCodec) {
534        ByteBuffDecompressor decompressor =
535          CodecPool.getByteBuffDecompressor((ByteBuffDecompressionCodec) codec);
536        if (LOG.isTraceEnabled()) {
537          LOG.trace("Retrieved decompressor {} from pool.", decompressor);
538        }
539        return decompressor;
540      } else {
541        throw new IllegalStateException("Codec " + codec + " does not support block decompression");
542      }
543    }
544
545    public void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) {
546      if (decompressor != null) {
547        if (LOG.isTraceEnabled()) {
548          LOG.trace("Returning decompressor {} to pool.", decompressor);
549        }
550        CodecPool.returnByteBuffDecompressor(decompressor);
551      }
552    }
553
554    /**
555     * Get an object that holds settings used by ByteBuffDecompressor. It's expensive to pull these
556     * from a Configuration object every time we decompress a block, so pull them here when, for
557     * example, opening an HFile, and reuse the returned HFileDecompressionContext as much as
558     * possible. The concrete class of this object will be one that is specific to the codec
559     * implementation in use. You don't need to inspect it yourself, just pass it along to
560     * {@link ByteBuffDecompressor#reinit(HFileDecompressionContext)}.
561     */
562    @Nullable
563    public HFileDecompressionContext
564      getHFileDecompressionContextForConfiguration(Configuration conf) {
565      if (supportsByteBuffDecompression()) {
566        return ((ByteBuffDecompressionCodec) getCodec(conf))
567          .getDecompressionContextFromConfiguration(conf);
568      } else {
569        return null;
570      }
571    }
572
573    public String getName() {
574      return compressName;
575    }
576  }
577
578  /**
579   * See {@link Algorithm#getHFileDecompressionContextForConfiguration(Configuration)}.
580   */
581  public static abstract class HFileDecompressionContext implements Closeable, HeapSize {
582  }
583
584  public static Algorithm getCompressionAlgorithmByName(String compressName) {
585    Algorithm[] algos = Algorithm.class.getEnumConstants();
586
587    for (Algorithm a : algos) {
588      if (a.getName().equals(compressName)) {
589        return a;
590      }
591    }
592
593    throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
594  }
595
596  /**
597   * Get names of supported compression algorithms.
598   * @return Array of strings, each represents a supported compression algorithm. Currently, the
599   *         following compression algorithms are supported.
600   */
601  public static String[] getSupportedAlgorithms() {
602    Algorithm[] algos = Algorithm.class.getEnumConstants();
603
604    String[] ret = new String[algos.length];
605    int i = 0;
606    for (Algorithm a : algos) {
607      ret[i++] = a.getName();
608    }
609
610    return ret;
611  }
612
613  /**
614   * Load a codec implementation for an algorithm using the supplied configuration.
615   * @param conf the configuration to use
616   * @param algo the algorithm to implement
617   */
618  private static CompressionCodec buildCodec(final Configuration conf, final Algorithm algo) {
619    try {
620      String codecClassName = conf.get(algo.confKey, algo.confDefault);
621      if (codecClassName == null) {
622        throw new RuntimeException("No codec configured for " + algo.confKey);
623      }
624      Class<?> codecClass = getClassLoaderForCodec().loadClass(codecClassName);
625      // The class is from hadoop so we use hadoop's ReflectionUtils to create it
626      CompressionCodec codec =
627        (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration(conf));
628      LOG.info("Loaded codec {} for compression algorithm {}", codec.getClass().getCanonicalName(),
629        algo.name());
630      return codec;
631    } catch (ClassNotFoundException e) {
632      throw new RuntimeException(e);
633    }
634  }
635
636  public static void main(String[] args) throws Exception {
637    Configuration conf = HBaseConfiguration.create();
638    java.util.Map<String, CompressionCodec> implMap = new java.util.HashMap<>();
639    for (Algorithm algo : Algorithm.class.getEnumConstants()) {
640      try {
641        implMap.put(algo.name(), algo.getCodec(conf));
642      } catch (Exception e) {
643        // Ignore failures to load codec native implementations while building the report.
644        // We are to report what is configured.
645      }
646    }
647    for (Algorithm algo : Algorithm.class.getEnumConstants()) {
648      System.out.println(algo.name() + ":");
649      System.out.println("    name: " + algo.getName());
650      System.out.println("    confKey: " + algo.confKey);
651      System.out.println("    confDefault: " + algo.confDefault);
652      CompressionCodec codec = implMap.get(algo.name());
653      System.out.println(
654        "    implClass: " + (codec != null ? codec.getClass().getCanonicalName() : "<none>"));
655    }
656  }
657
658}