001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import java.io.BufferedInputStream;
021import java.io.BufferedOutputStream;
022import java.io.FilterOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.io.compress.CompressionCodec;
030import org.apache.hadoop.io.compress.CompressionInputStream;
031import org.apache.hadoop.io.compress.CompressionOutputStream;
032import org.apache.hadoop.io.compress.Compressor;
033import org.apache.hadoop.io.compress.Decompressor;
034import org.apache.hadoop.io.compress.DoNotPool;
035import org.apache.hadoop.util.ReflectionUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Compression related stuff. Copied from hadoop-3315 tfile.
042 */
043@InterfaceAudience.Private
044public final class Compression {
045  private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
046
047  // LZO
048
049  public static final String LZO_CODEC_CLASS_KEY = "hbase.io.compress.lzo.codec";
050  public static final String LZO_CODEC_CLASS_DEFAULT = "com.hadoop.compression.lzo.LzoCodec";
051
052  // GZ
053
054  public static final String GZ_CODEC_CLASS_KEY = "hbase.io.compress.gz.codec";
055  // Our ReusableStreamGzipCodec fixes an inefficiency in Hadoop's Gzip codec, allowing us to
056  // reuse compression streams, but still requires the Hadoop native codec.
057  public static final String GZ_CODEC_CLASS_DEFAULT =
058    "org.apache.hadoop.hbase.io.compress.ReusableStreamGzipCodec";
059
060  // SNAPPY
061
062  public static final String SNAPPY_CODEC_CLASS_KEY = "hbase.io.compress.snappy.codec";
063  public static final String SNAPPY_CODEC_CLASS_DEFAULT =
064    "org.apache.hadoop.io.compress.SnappyCodec";
065
066  // LZ4
067
068  public static final String LZ4_CODEC_CLASS_KEY = "hbase.io.compress.lz4.codec";
069  public static final String LZ4_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.Lz4Codec";
070
071  // ZSTD
072
073  public static final String ZSTD_CODEC_CLASS_KEY = "hbase.io.compress.zstd.codec";
074  public static final String ZSTD_CODEC_CLASS_DEFAULT =
075    "org.apache.hadoop.io.compress.ZStandardCodec";
076
077  // BZIP2
078
079  public static final String BZIP2_CODEC_CLASS_KEY = "hbase.io.compress.bzip2.codec";
080  public static final String BZIP2_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.BZip2Codec";
081
082  // LZMA
083
084  public static final String LZMA_CODEC_CLASS_KEY = "hbase.io.compress.lzma.codec";
085  public static final String LZMA_CODEC_CLASS_DEFAULT =
086    "org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
087
088  // Brotli
089
090  public static final String BROTLI_CODEC_CLASS_KEY = "hbase.io.compress.brotli.codec";
091  public static final String BROTLI_CODEC_CLASS_DEFAULT =
092    "org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";
093
094  /**
095   * Prevent the instantiation of class.
096   */
097  private Compression() {
098    super();
099  }
100
101  static class FinishOnFlushCompressionStream extends FilterOutputStream {
102    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
103      super(cout);
104    }
105
106    @Override
107    public void write(byte b[], int off, int len) throws IOException {
108      out.write(b, off, len);
109    }
110
111    @Override
112    public void flush() throws IOException {
113      CompressionOutputStream cout = (CompressionOutputStream) out;
114      cout.finish();
115      cout.flush();
116      cout.resetState();
117    }
118  }
119
120  /**
121   * Returns the classloader to load the Codec class from.
122   */
123  private static ClassLoader getClassLoaderForCodec() {
124    ClassLoader cl = Thread.currentThread().getContextClassLoader();
125    if (cl == null) {
126      cl = Compression.class.getClassLoader();
127    }
128    if (cl == null) {
129      cl = ClassLoader.getSystemClassLoader();
130    }
131    if (cl == null) {
132      throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
133    }
134    return cl;
135  }
136
137  /**
138   * Compression algorithms. The ordinal of these cannot change or else you risk breaking all
139   * existing HFiles out there. Even the ones that are not compressed! (They use the NONE algorithm)
140   */
141  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED",
142      justification = "We are not serializing so doesn't apply (not sure why transient though)")
143  @SuppressWarnings("ImmutableEnumChecker")
144  @InterfaceAudience.Public
145  public static enum Algorithm {
146    // LZO is GPL and requires extra install to setup. See
147    // https://stackoverflow.com/questions/23441142/class-com-hadoop-compression-lzo-lzocodec-not-found-for-spark-on-cdh-5
148    LZO("lzo", LZO_CODEC_CLASS_KEY, LZO_CODEC_CLASS_DEFAULT) {
149      // Use base type to avoid compile-time dependencies.
150      private volatile transient CompressionCodec lzoCodec;
151      private final transient Object lock = new Object();
152
153      @Override
154      CompressionCodec getCodec(Configuration conf) {
155        if (lzoCodec == null) {
156          synchronized (lock) {
157            if (lzoCodec == null) {
158              lzoCodec = buildCodec(conf, this);
159            }
160          }
161        }
162        return lzoCodec;
163      }
164
165      @Override
166      public CompressionCodec reload(Configuration conf) {
167        synchronized (lock) {
168          lzoCodec = buildCodec(conf, this);
169          LOG.warn("Reloaded configuration for {}", name());
170          return lzoCodec;
171        }
172      }
173    },
174
175    GZ("gz", GZ_CODEC_CLASS_KEY, GZ_CODEC_CLASS_DEFAULT) {
176      private volatile transient CompressionCodec gzCodec;
177      private final transient Object lock = new Object();
178
179      @Override
180      CompressionCodec getCodec(Configuration conf) {
181        if (gzCodec == null) {
182          synchronized (lock) {
183            if (gzCodec == null) {
184              gzCodec = buildCodec(conf, this);
185            }
186          }
187        }
188        return gzCodec;
189      }
190
191      @Override
192      public CompressionCodec reload(Configuration conf) {
193        synchronized (lock) {
194          gzCodec = buildCodec(conf, this);
195          LOG.warn("Reloaded configuration for {}", name());
196          return gzCodec;
197        }
198      }
199    },
200
201    NONE("none", "", "") {
202      @Override
203      CompressionCodec getCodec(Configuration conf) {
204        return null;
205      }
206
207      @Override
208      public CompressionCodec reload(Configuration conf) {
209        return null;
210      }
211
212      @Override
213      public synchronized InputStream createDecompressionStream(InputStream downStream,
214        Decompressor decompressor, int downStreamBufferSize) throws IOException {
215        if (downStreamBufferSize > 0) {
216          return new BufferedInputStream(downStream, downStreamBufferSize);
217        }
218        return downStream;
219      }
220
221      @Override
222      public synchronized OutputStream createCompressionStream(OutputStream downStream,
223        Compressor compressor, int downStreamBufferSize) throws IOException {
224        if (downStreamBufferSize > 0) {
225          return new BufferedOutputStream(downStream, downStreamBufferSize);
226        }
227
228        return downStream;
229      }
230    },
231    SNAPPY("snappy", SNAPPY_CODEC_CLASS_KEY, SNAPPY_CODEC_CLASS_DEFAULT) {
232      // Use base type to avoid compile-time dependencies.
233      private volatile transient CompressionCodec snappyCodec;
234      private final transient Object lock = new Object();
235
236      @Override
237      CompressionCodec getCodec(Configuration conf) {
238        if (snappyCodec == null) {
239          synchronized (lock) {
240            if (snappyCodec == null) {
241              snappyCodec = buildCodec(conf, this);
242            }
243          }
244        }
245        return snappyCodec;
246      }
247
248      @Override
249      public CompressionCodec reload(Configuration conf) {
250        synchronized (lock) {
251          snappyCodec = buildCodec(conf, this);
252          LOG.warn("Reloaded configuration for {}", name());
253          return snappyCodec;
254        }
255      }
256    },
257    LZ4("lz4", LZ4_CODEC_CLASS_KEY, LZ4_CODEC_CLASS_DEFAULT) {
258      // Use base type to avoid compile-time dependencies.
259      private volatile transient CompressionCodec lz4Codec;
260      private final transient Object lock = new Object();
261
262      @Override
263      CompressionCodec getCodec(Configuration conf) {
264        if (lz4Codec == null) {
265          synchronized (lock) {
266            if (lz4Codec == null) {
267              lz4Codec = buildCodec(conf, this);
268            }
269          }
270        }
271        return lz4Codec;
272      }
273
274      @Override
275      public CompressionCodec reload(Configuration conf) {
276        synchronized (lock) {
277          lz4Codec = buildCodec(conf, this);
278          LOG.warn("Reloaded configuration for {}", name());
279          return lz4Codec;
280        }
281      }
282    },
283    BZIP2("bzip2", BZIP2_CODEC_CLASS_KEY, BZIP2_CODEC_CLASS_DEFAULT) {
284      // Use base type to avoid compile-time dependencies.
285      private volatile transient CompressionCodec bzipCodec;
286      private final transient Object lock = new Object();
287
288      @Override
289      CompressionCodec getCodec(Configuration conf) {
290        if (bzipCodec == null) {
291          synchronized (lock) {
292            if (bzipCodec == null) {
293              bzipCodec = buildCodec(conf, this);
294            }
295          }
296        }
297        return bzipCodec;
298      }
299
300      @Override
301      public CompressionCodec reload(Configuration conf) {
302        synchronized (lock) {
303          bzipCodec = buildCodec(conf, this);
304          LOG.warn("Reloaded configuration for {}", name());
305          return bzipCodec;
306        }
307      }
308    },
309    ZSTD("zstd", ZSTD_CODEC_CLASS_KEY, ZSTD_CODEC_CLASS_DEFAULT) {
310      // Use base type to avoid compile-time dependencies.
311      private volatile transient CompressionCodec zStandardCodec;
312      private final transient Object lock = new Object();
313
314      @Override
315      CompressionCodec getCodec(Configuration conf) {
316        if (zStandardCodec == null) {
317          synchronized (lock) {
318            if (zStandardCodec == null) {
319              zStandardCodec = buildCodec(conf, this);
320            }
321          }
322        }
323        return zStandardCodec;
324      }
325
326      @Override
327      public CompressionCodec reload(Configuration conf) {
328        synchronized (lock) {
329          zStandardCodec = buildCodec(conf, this);
330          LOG.warn("Reloaded configuration for {}", name());
331          return zStandardCodec;
332        }
333      }
334    },
335    LZMA("lzma", LZMA_CODEC_CLASS_KEY, LZMA_CODEC_CLASS_DEFAULT) {
336      // Use base type to avoid compile-time dependencies.
337      private volatile transient CompressionCodec lzmaCodec;
338      private final transient Object lock = new Object();
339
340      @Override
341      CompressionCodec getCodec(Configuration conf) {
342        if (lzmaCodec == null) {
343          synchronized (lock) {
344            if (lzmaCodec == null) {
345              lzmaCodec = buildCodec(conf, this);
346            }
347          }
348        }
349        return lzmaCodec;
350      }
351
352      @Override
353      public CompressionCodec reload(Configuration conf) {
354        synchronized (lock) {
355          lzmaCodec = buildCodec(conf, this);
356          LOG.warn("Reloaded configuration for {}", name());
357          return lzmaCodec;
358        }
359      }
360    },
361
362    BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
363      // Use base type to avoid compile-time dependencies.
364      private volatile transient CompressionCodec brotliCodec;
365      private final transient Object lock = new Object();
366
367      @Override
368      CompressionCodec getCodec(Configuration conf) {
369        if (brotliCodec == null) {
370          synchronized (lock) {
371            if (brotliCodec == null) {
372              brotliCodec = buildCodec(conf, this);
373            }
374          }
375        }
376        return brotliCodec;
377      }
378
379      @Override
380      public CompressionCodec reload(Configuration conf) {
381        synchronized (lock) {
382          brotliCodec = buildCodec(conf, this);
383          LOG.warn("Reloaded configuration for {}", name());
384          return brotliCodec;
385        }
386      }
387    };
388
389    private final Configuration conf;
390    private final String compressName;
391    private final String confKey;
392    private final String confDefault;
393    /** data input buffer size to absorb small reads from application. */
394    private static final int DATA_IBUF_SIZE = 1 * 1024;
395    /** data output buffer size to absorb small writes from application. */
396    private static final int DATA_OBUF_SIZE = 4 * 1024;
397
398    Algorithm(String name, String confKey, String confDefault) {
399      this.conf = HBaseConfiguration.create();
400      this.conf.setBoolean("io.native.lib.available", true);
401      this.compressName = name;
402      this.confKey = confKey;
403      this.confDefault = confDefault;
404    }
405
406    abstract CompressionCodec getCodec(Configuration conf);
407
408    /**
409     * Reload configuration for the given algorithm.
410     * <p>
411     * NOTE: Experts only. This can only be done safely during process startup, before the
412     * algorithm's codecs are in use. If the codec implementation is changed, the new implementation
413     * may not be fully compatible with what was loaded at static initialization time, leading to
414     * potential data corruption. Mostly used by unit tests.
415     * @param conf configuration
416     */
417    public abstract CompressionCodec reload(Configuration conf);
418
419    public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor,
420      int downStreamBufferSize) throws IOException {
421      CompressionCodec codec = getCodec(conf);
422      // Set the internal buffer size to read from down stream.
423      if (downStreamBufferSize > 0) {
424        ((Configurable) codec).getConf().setInt("io.file.buffer.size", downStreamBufferSize);
425      }
426      CompressionInputStream cis = codec.createInputStream(downStream, decompressor);
427      BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
428      return bis2;
429
430    }
431
432    public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
433      int downStreamBufferSize) throws IOException {
434      OutputStream bos1 = null;
435      if (downStreamBufferSize > 0) {
436        bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
437      } else {
438        bos1 = downStream;
439      }
440      CompressionOutputStream cos = createPlainCompressionStream(bos1, compressor);
441      BufferedOutputStream bos2 =
442        new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
443      return bos2;
444    }
445
446    /**
447     * Creates a compression stream without any additional wrapping into buffering streams.
448     */
449    public CompressionOutputStream createPlainCompressionStream(OutputStream downStream,
450      Compressor compressor) throws IOException {
451      CompressionCodec codec = getCodec(conf);
452      ((Configurable) codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
453      return codec.createOutputStream(downStream, compressor);
454    }
455
456    public Compressor getCompressor() {
457      CompressionCodec codec = getCodec(conf);
458      if (codec != null) {
459        Compressor compressor = CodecPool.getCompressor(codec);
460        if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
461        if (compressor != null) {
462          if (compressor.finished()) {
463            // Somebody returns the compressor to CodecPool but is still using it.
464            LOG.warn("Compressor obtained from CodecPool is already finished()");
465          }
466          compressor.reset();
467        }
468        return compressor;
469      }
470      return null;
471    }
472
473    public void returnCompressor(Compressor compressor) {
474      if (compressor != null) {
475        if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
476        CodecPool.returnCompressor(compressor);
477      }
478    }
479
480    public Decompressor getDecompressor() {
481      CompressionCodec codec = getCodec(conf);
482      if (codec != null) {
483        Decompressor decompressor = CodecPool.getDecompressor(codec);
484        if (LOG.isTraceEnabled())
485          LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
486        if (decompressor != null) {
487          if (decompressor.finished()) {
488            // Somebody returns the decompressor to CodecPool but is still using it.
489            LOG.warn("Decompressor {} obtained from CodecPool is already finished", decompressor);
490          }
491          decompressor.reset();
492        }
493        return decompressor;
494      }
495
496      return null;
497    }
498
499    public void returnDecompressor(Decompressor decompressor) {
500      if (decompressor != null) {
501        if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
502        CodecPool.returnDecompressor(decompressor);
503        if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
504          if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
505          decompressor.end();
506        }
507      }
508    }
509
510    public String getName() {
511      return compressName;
512    }
513  }
514
515  public static Algorithm getCompressionAlgorithmByName(String compressName) {
516    Algorithm[] algos = Algorithm.class.getEnumConstants();
517
518    for (Algorithm a : algos) {
519      if (a.getName().equals(compressName)) {
520        return a;
521      }
522    }
523
524    throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
525  }
526
527  /**
528   * Get names of supported compression algorithms.
529   * @return Array of strings, each represents a supported compression algorithm. Currently, the
530   *         following compression algorithms are supported.
531   */
532  public static String[] getSupportedAlgorithms() {
533    Algorithm[] algos = Algorithm.class.getEnumConstants();
534
535    String[] ret = new String[algos.length];
536    int i = 0;
537    for (Algorithm a : algos) {
538      ret[i++] = a.getName();
539    }
540
541    return ret;
542  }
543
544  /**
545   * Load a codec implementation for an algorithm using the supplied configuration.
546   * @param conf the configuration to use
547   * @param algo the algorithm to implement
548   */
549  private static CompressionCodec buildCodec(final Configuration conf, final Algorithm algo) {
550    try {
551      String codecClassName = conf.get(algo.confKey, algo.confDefault);
552      if (codecClassName == null) {
553        throw new RuntimeException("No codec configured for " + algo.confKey);
554      }
555      Class<?> codecClass = getClassLoaderForCodec().loadClass(codecClassName);
556      // The class is from hadoop so we use hadoop's ReflectionUtils to create it
557      CompressionCodec codec =
558        (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration(conf));
559      LOG.info("Loaded codec {} for compression algorithm {}", codec.getClass().getCanonicalName(),
560        algo.name());
561      return codec;
562    } catch (ClassNotFoundException e) {
563      throw new RuntimeException(e);
564    }
565  }
566
567  public static void main(String[] args) throws Exception {
568    Configuration conf = HBaseConfiguration.create();
569    java.util.Map<String, CompressionCodec> implMap = new java.util.HashMap<>();
570    for (Algorithm algo : Algorithm.class.getEnumConstants()) {
571      try {
572        implMap.put(algo.name(), algo.getCodec(conf));
573      } catch (Exception e) {
574        // Ignore failures to load codec native implementations while building the report.
575        // We are to report what is configured.
576      }
577    }
578    for (Algorithm algo : Algorithm.class.getEnumConstants()) {
579      System.out.println(algo.name() + ":");
580      System.out.println("    name: " + algo.getName());
581      System.out.println("    confKey: " + algo.confKey);
582      System.out.println("    confDefault: " + algo.confDefault);
583      CompressionCodec codec = implMap.get(algo.name());
584      System.out.println(
585        "    implClass: " + (codec != null ? codec.getClass().getCanonicalName() : "<none>"));
586    }
587  }
588
589}