001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import edu.umd.cs.findbugs.annotations.Nullable; 021import java.io.BufferedInputStream; 022import java.io.BufferedOutputStream; 023import java.io.Closeable; 024import java.io.FilterOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import org.apache.hadoop.conf.Configurable; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.io.HeapSize; 032import org.apache.hadoop.hbase.nio.ByteBuff; 033import org.apache.hadoop.io.compress.CompressionCodec; 034import org.apache.hadoop.io.compress.CompressionInputStream; 035import org.apache.hadoop.io.compress.CompressionOutputStream; 036import org.apache.hadoop.io.compress.Compressor; 037import org.apache.hadoop.io.compress.Decompressor; 038import org.apache.hadoop.io.compress.DoNotPool; 039import org.apache.hadoop.util.ReflectionUtils; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Compression related stuff. Copied from hadoop-3315 tfile. 046 */ 047@InterfaceAudience.Private 048public final class Compression { 049 private static final Logger LOG = LoggerFactory.getLogger(Compression.class); 050 051 // LZO 052 053 public static final String LZO_CODEC_CLASS_KEY = "hbase.io.compress.lzo.codec"; 054 public static final String LZO_CODEC_CLASS_DEFAULT = "com.hadoop.compression.lzo.LzoCodec"; 055 056 // GZ 057 058 public static final String GZ_CODEC_CLASS_KEY = "hbase.io.compress.gz.codec"; 059 // Our ReusableStreamGzipCodec fixes an inefficiency in Hadoop's Gzip codec, allowing us to 060 // reuse compression streams, but still requires the Hadoop native codec. 061 public static final String GZ_CODEC_CLASS_DEFAULT = 062 "org.apache.hadoop.hbase.io.compress.ReusableStreamGzipCodec"; 063 064 // SNAPPY 065 066 public static final String SNAPPY_CODEC_CLASS_KEY = "hbase.io.compress.snappy.codec"; 067 public static final String SNAPPY_CODEC_CLASS_DEFAULT = 068 "org.apache.hadoop.io.compress.SnappyCodec"; 069 070 // LZ4 071 072 public static final String LZ4_CODEC_CLASS_KEY = "hbase.io.compress.lz4.codec"; 073 public static final String LZ4_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.Lz4Codec"; 074 075 // ZSTD 076 077 public static final String ZSTD_CODEC_CLASS_KEY = "hbase.io.compress.zstd.codec"; 078 public static final String ZSTD_CODEC_CLASS_DEFAULT = 079 "org.apache.hadoop.io.compress.ZStandardCodec"; 080 081 // BZIP2 082 083 public static final String BZIP2_CODEC_CLASS_KEY = "hbase.io.compress.bzip2.codec"; 084 public static final String BZIP2_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.BZip2Codec"; 085 086 // LZMA 087 088 public static final String LZMA_CODEC_CLASS_KEY = "hbase.io.compress.lzma.codec"; 089 public static final String LZMA_CODEC_CLASS_DEFAULT = 090 "org.apache.hadoop.hbase.io.compress.xz.LzmaCodec"; 091 092 // Brotli 093 094 public static final String BROTLI_CODEC_CLASS_KEY = "hbase.io.compress.brotli.codec"; 095 public static final String BROTLI_CODEC_CLASS_DEFAULT = 096 "org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec"; 097 098 /** 099 * Prevent the instantiation of class. 100 */ 101 private Compression() { 102 super(); 103 } 104 105 static class FinishOnFlushCompressionStream extends FilterOutputStream { 106 public FinishOnFlushCompressionStream(CompressionOutputStream cout) { 107 super(cout); 108 } 109 110 @Override 111 public void write(byte b[], int off, int len) throws IOException { 112 out.write(b, off, len); 113 } 114 115 @Override 116 public void flush() throws IOException { 117 CompressionOutputStream cout = (CompressionOutputStream) out; 118 cout.finish(); 119 cout.flush(); 120 cout.resetState(); 121 } 122 } 123 124 /** 125 * Returns the classloader to load the Codec class from. 126 */ 127 private static ClassLoader getClassLoaderForCodec() { 128 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 129 if (cl == null) { 130 cl = Compression.class.getClassLoader(); 131 } 132 if (cl == null) { 133 cl = ClassLoader.getSystemClassLoader(); 134 } 135 if (cl == null) { 136 throw new RuntimeException("A ClassLoader to load the Codec could not be determined"); 137 } 138 return cl; 139 } 140 141 /** 142 * Compression algorithms. The ordinal of these cannot change or else you risk breaking all 143 * existing HFiles out there. Even the ones that are not compressed! (They use the NONE algorithm) 144 */ 145 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", 146 justification = "We are not serializing so doesn't apply (not sure why transient though)") 147 @SuppressWarnings("ImmutableEnumChecker") 148 @InterfaceAudience.Public 149 public static enum Algorithm { 150 // LZO is GPL and requires extra install to setup. See 151 // https://stackoverflow.com/questions/23441142/class-com-hadoop-compression-lzo-lzocodec-not-found-for-spark-on-cdh-5 152 LZO("lzo", LZO_CODEC_CLASS_KEY, LZO_CODEC_CLASS_DEFAULT) { 153 // Use base type to avoid compile-time dependencies. 154 private volatile transient CompressionCodec lzoCodec; 155 private final transient Object lock = new Object(); 156 157 @Override 158 CompressionCodec getCodec(Configuration conf) { 159 if (lzoCodec == null) { 160 synchronized (lock) { 161 if (lzoCodec == null) { 162 lzoCodec = buildCodec(conf, this); 163 } 164 } 165 } 166 return lzoCodec; 167 } 168 169 @Override 170 public CompressionCodec reload(Configuration conf) { 171 synchronized (lock) { 172 lzoCodec = buildCodec(conf, this); 173 LOG.warn("Reloaded configuration for {}", name()); 174 return lzoCodec; 175 } 176 } 177 }, 178 179 GZ("gz", GZ_CODEC_CLASS_KEY, GZ_CODEC_CLASS_DEFAULT) { 180 private volatile transient CompressionCodec gzCodec; 181 private final transient Object lock = new Object(); 182 183 @Override 184 CompressionCodec getCodec(Configuration conf) { 185 if (gzCodec == null) { 186 synchronized (lock) { 187 if (gzCodec == null) { 188 gzCodec = buildCodec(conf, this); 189 } 190 } 191 } 192 return gzCodec; 193 } 194 195 @Override 196 public CompressionCodec reload(Configuration conf) { 197 synchronized (lock) { 198 gzCodec = buildCodec(conf, this); 199 LOG.warn("Reloaded configuration for {}", name()); 200 return gzCodec; 201 } 202 } 203 }, 204 205 NONE("none", "", "") { 206 @Override 207 CompressionCodec getCodec(Configuration conf) { 208 return null; 209 } 210 211 @Override 212 public CompressionCodec reload(Configuration conf) { 213 return null; 214 } 215 216 @Override 217 public synchronized InputStream createDecompressionStream(InputStream downStream, 218 Decompressor decompressor, int downStreamBufferSize) throws IOException { 219 if (downStreamBufferSize > 0) { 220 return new BufferedInputStream(downStream, downStreamBufferSize); 221 } 222 return downStream; 223 } 224 225 @Override 226 public synchronized OutputStream createCompressionStream(OutputStream downStream, 227 Compressor compressor, int downStreamBufferSize) throws IOException { 228 if (downStreamBufferSize > 0) { 229 return new BufferedOutputStream(downStream, downStreamBufferSize); 230 } 231 232 return downStream; 233 } 234 }, 235 SNAPPY("snappy", SNAPPY_CODEC_CLASS_KEY, SNAPPY_CODEC_CLASS_DEFAULT) { 236 // Use base type to avoid compile-time dependencies. 237 private volatile transient CompressionCodec snappyCodec; 238 private final transient Object lock = new Object(); 239 240 @Override 241 CompressionCodec getCodec(Configuration conf) { 242 if (snappyCodec == null) { 243 synchronized (lock) { 244 if (snappyCodec == null) { 245 snappyCodec = buildCodec(conf, this); 246 } 247 } 248 } 249 return snappyCodec; 250 } 251 252 @Override 253 public CompressionCodec reload(Configuration conf) { 254 synchronized (lock) { 255 snappyCodec = buildCodec(conf, this); 256 LOG.warn("Reloaded configuration for {}", name()); 257 return snappyCodec; 258 } 259 } 260 }, 261 LZ4("lz4", LZ4_CODEC_CLASS_KEY, LZ4_CODEC_CLASS_DEFAULT) { 262 // Use base type to avoid compile-time dependencies. 263 private volatile transient CompressionCodec lz4Codec; 264 private final transient Object lock = new Object(); 265 266 @Override 267 CompressionCodec getCodec(Configuration conf) { 268 if (lz4Codec == null) { 269 synchronized (lock) { 270 if (lz4Codec == null) { 271 lz4Codec = buildCodec(conf, this); 272 } 273 } 274 } 275 return lz4Codec; 276 } 277 278 @Override 279 public CompressionCodec reload(Configuration conf) { 280 synchronized (lock) { 281 lz4Codec = buildCodec(conf, this); 282 LOG.warn("Reloaded configuration for {}", name()); 283 return lz4Codec; 284 } 285 } 286 }, 287 BZIP2("bzip2", BZIP2_CODEC_CLASS_KEY, BZIP2_CODEC_CLASS_DEFAULT) { 288 // Use base type to avoid compile-time dependencies. 289 private volatile transient CompressionCodec bzipCodec; 290 private final transient Object lock = new Object(); 291 292 @Override 293 CompressionCodec getCodec(Configuration conf) { 294 if (bzipCodec == null) { 295 synchronized (lock) { 296 if (bzipCodec == null) { 297 bzipCodec = buildCodec(conf, this); 298 } 299 } 300 } 301 return bzipCodec; 302 } 303 304 @Override 305 public CompressionCodec reload(Configuration conf) { 306 synchronized (lock) { 307 bzipCodec = buildCodec(conf, this); 308 LOG.warn("Reloaded configuration for {}", name()); 309 return bzipCodec; 310 } 311 } 312 }, 313 ZSTD("zstd", ZSTD_CODEC_CLASS_KEY, ZSTD_CODEC_CLASS_DEFAULT) { 314 // Use base type to avoid compile-time dependencies. 315 private volatile transient CompressionCodec zStandardCodec; 316 private final transient Object lock = new Object(); 317 318 @Override 319 CompressionCodec getCodec(Configuration conf) { 320 if (zStandardCodec == null) { 321 synchronized (lock) { 322 if (zStandardCodec == null) { 323 zStandardCodec = buildCodec(conf, this); 324 } 325 } 326 } 327 return zStandardCodec; 328 } 329 330 @Override 331 public CompressionCodec reload(Configuration conf) { 332 synchronized (lock) { 333 zStandardCodec = buildCodec(conf, this); 334 LOG.warn("Reloaded configuration for {}", name()); 335 return zStandardCodec; 336 } 337 } 338 }, 339 LZMA("lzma", LZMA_CODEC_CLASS_KEY, LZMA_CODEC_CLASS_DEFAULT) { 340 // Use base type to avoid compile-time dependencies. 341 private volatile transient CompressionCodec lzmaCodec; 342 private final transient Object lock = new Object(); 343 344 @Override 345 CompressionCodec getCodec(Configuration conf) { 346 if (lzmaCodec == null) { 347 synchronized (lock) { 348 if (lzmaCodec == null) { 349 lzmaCodec = buildCodec(conf, this); 350 } 351 } 352 } 353 return lzmaCodec; 354 } 355 356 @Override 357 public CompressionCodec reload(Configuration conf) { 358 synchronized (lock) { 359 lzmaCodec = buildCodec(conf, this); 360 LOG.warn("Reloaded configuration for {}", name()); 361 return lzmaCodec; 362 } 363 } 364 }, 365 366 BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) { 367 // Use base type to avoid compile-time dependencies. 368 private volatile transient CompressionCodec brotliCodec; 369 private final transient Object lock = new Object(); 370 371 @Override 372 CompressionCodec getCodec(Configuration conf) { 373 if (brotliCodec == null) { 374 synchronized (lock) { 375 if (brotliCodec == null) { 376 brotliCodec = buildCodec(conf, this); 377 } 378 } 379 } 380 return brotliCodec; 381 } 382 383 @Override 384 public CompressionCodec reload(Configuration conf) { 385 synchronized (lock) { 386 brotliCodec = buildCodec(conf, this); 387 LOG.warn("Reloaded configuration for {}", name()); 388 return brotliCodec; 389 } 390 } 391 }; 392 393 private final Configuration conf; 394 private final String compressName; 395 private final String confKey; 396 private final String confDefault; 397 /** data input buffer size to absorb small reads from application. */ 398 private static final int DATA_IBUF_SIZE = 1 * 1024; 399 /** data output buffer size to absorb small writes from application. */ 400 private static final int DATA_OBUF_SIZE = 4 * 1024; 401 402 Algorithm(String name, String confKey, String confDefault) { 403 this.conf = HBaseConfiguration.create(); 404 this.conf.setBoolean("io.native.lib.available", true); 405 this.compressName = name; 406 this.confKey = confKey; 407 this.confDefault = confDefault; 408 } 409 410 abstract CompressionCodec getCodec(Configuration conf); 411 412 /** 413 * Reload configuration for the given algorithm. 414 * <p> 415 * NOTE: Experts only. This can only be done safely during process startup, before the 416 * algorithm's codecs are in use. If the codec implementation is changed, the new implementation 417 * may not be fully compatible with what was loaded at static initialization time, leading to 418 * potential data corruption. Mostly used by unit tests. 419 * @param conf configuration 420 */ 421 public abstract CompressionCodec reload(Configuration conf); 422 423 public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, 424 int downStreamBufferSize) throws IOException { 425 CompressionCodec codec = getCodec(conf); 426 // Set the internal buffer size to read from down stream. 427 if (downStreamBufferSize > 0) { 428 ((Configurable) codec).getConf().setInt("io.file.buffer.size", downStreamBufferSize); 429 } 430 CompressionInputStream cis = codec.createInputStream(downStream, decompressor); 431 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); 432 return bis2; 433 434 } 435 436 public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, 437 int downStreamBufferSize) throws IOException { 438 OutputStream bos1 = null; 439 if (downStreamBufferSize > 0) { 440 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); 441 } else { 442 bos1 = downStream; 443 } 444 CompressionOutputStream cos = createPlainCompressionStream(bos1, compressor); 445 BufferedOutputStream bos2 = 446 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); 447 return bos2; 448 } 449 450 /** 451 * Creates a compression stream without any additional wrapping into buffering streams. 452 */ 453 public CompressionOutputStream createPlainCompressionStream(OutputStream downStream, 454 Compressor compressor) throws IOException { 455 CompressionCodec codec = getCodec(conf); 456 ((Configurable) codec).getConf().setInt("io.file.buffer.size", 32 * 1024); 457 return codec.createOutputStream(downStream, compressor); 458 } 459 460 public Compressor getCompressor() { 461 CompressionCodec codec = getCodec(conf); 462 if (codec != null) { 463 Compressor compressor = CodecPool.getCompressor(codec); 464 if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool."); 465 if (compressor != null) { 466 if (compressor.finished()) { 467 // Somebody returns the compressor to CodecPool but is still using it. 468 LOG.warn("Compressor obtained from CodecPool is already finished()"); 469 } 470 compressor.reset(); 471 } 472 return compressor; 473 } 474 return null; 475 } 476 477 public void returnCompressor(Compressor compressor) { 478 if (compressor != null) { 479 if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool."); 480 CodecPool.returnCompressor(compressor); 481 } 482 } 483 484 public Decompressor getDecompressor() { 485 CompressionCodec codec = getCodec(conf); 486 if (codec != null) { 487 Decompressor decompressor = CodecPool.getDecompressor(codec); 488 if (LOG.isTraceEnabled()) 489 LOG.trace("Retrieved decompressor " + decompressor + " from pool."); 490 if (decompressor != null) { 491 if (decompressor.finished()) { 492 // Somebody returns the decompressor to CodecPool but is still using it. 493 LOG.warn("Decompressor {} obtained from CodecPool is already finished", decompressor); 494 } 495 decompressor.reset(); 496 } 497 return decompressor; 498 } 499 500 return null; 501 } 502 503 public void returnDecompressor(Decompressor decompressor) { 504 if (decompressor != null) { 505 if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool."); 506 CodecPool.returnDecompressor(decompressor); 507 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 508 if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor); 509 decompressor.end(); 510 } 511 } 512 } 513 514 /** 515 * Signals if this codec theoretically supports decompression on {@link ByteBuff}s. This can be 516 * faster than using a DecompressionStream. If this method returns true, you can call 517 * {@link #getByteBuffDecompressor()} to obtain a {@link ByteBuffDecompressor}. You must then 518 * also call {@link ByteBuffDecompressor#canDecompress(ByteBuff, ByteBuff)} before attempting 519 * decompression, to verify if that decompressor is capable of handling your particular input 520 * and output buffers. 521 */ 522 public boolean supportsByteBuffDecompression() { 523 CompressionCodec codec = getCodec(conf); 524 return codec instanceof ByteBuffDecompressionCodec; 525 } 526 527 /** 528 * Be sure to call {@link #supportsByteBuffDecompression()} before calling this method. 529 * @throws IllegalStateException if the codec does not support block decompression 530 */ 531 public ByteBuffDecompressor getByteBuffDecompressor() { 532 CompressionCodec codec = getCodec(conf); 533 if (codec instanceof ByteBuffDecompressionCodec) { 534 ByteBuffDecompressor decompressor = 535 CodecPool.getByteBuffDecompressor((ByteBuffDecompressionCodec) codec); 536 if (LOG.isTraceEnabled()) { 537 LOG.trace("Retrieved decompressor {} from pool.", decompressor); 538 } 539 return decompressor; 540 } else { 541 throw new IllegalStateException("Codec " + codec + " does not support block decompression"); 542 } 543 } 544 545 public void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) { 546 if (decompressor != null) { 547 if (LOG.isTraceEnabled()) { 548 LOG.trace("Returning decompressor {} to pool.", decompressor); 549 } 550 CodecPool.returnByteBuffDecompressor(decompressor); 551 } 552 } 553 554 /** 555 * Get an object that holds settings used by ByteBuffDecompressor. It's expensive to pull these 556 * from a Configuration object every time we decompress a block, so pull them here when, for 557 * example, opening an HFile, and reuse the returned HFileDecompressionContext as much as 558 * possible. The concrete class of this object will be one that is specific to the codec 559 * implementation in use. You don't need to inspect it yourself, just pass it along to 560 * {@link ByteBuffDecompressor#reinit(HFileDecompressionContext)}. 561 */ 562 @Nullable 563 public HFileDecompressionContext 564 getHFileDecompressionContextForConfiguration(Configuration conf) { 565 if (supportsByteBuffDecompression()) { 566 return ((ByteBuffDecompressionCodec) getCodec(conf)) 567 .getDecompressionContextFromConfiguration(conf); 568 } else { 569 return null; 570 } 571 } 572 573 public String getName() { 574 return compressName; 575 } 576 } 577 578 /** 579 * See {@link Algorithm#getHFileDecompressionContextForConfiguration(Configuration)}. 580 */ 581 public static abstract class HFileDecompressionContext implements Closeable, HeapSize { 582 } 583 584 public static Algorithm getCompressionAlgorithmByName(String compressName) { 585 Algorithm[] algos = Algorithm.class.getEnumConstants(); 586 587 for (Algorithm a : algos) { 588 if (a.getName().equals(compressName)) { 589 return a; 590 } 591 } 592 593 throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName); 594 } 595 596 /** 597 * Get names of supported compression algorithms. 598 * @return Array of strings, each represents a supported compression algorithm. Currently, the 599 * following compression algorithms are supported. 600 */ 601 public static String[] getSupportedAlgorithms() { 602 Algorithm[] algos = Algorithm.class.getEnumConstants(); 603 604 String[] ret = new String[algos.length]; 605 int i = 0; 606 for (Algorithm a : algos) { 607 ret[i++] = a.getName(); 608 } 609 610 return ret; 611 } 612 613 /** 614 * Load a codec implementation for an algorithm using the supplied configuration. 615 * @param conf the configuration to use 616 * @param algo the algorithm to implement 617 */ 618 private static CompressionCodec buildCodec(final Configuration conf, final Algorithm algo) { 619 try { 620 String codecClassName = conf.get(algo.confKey, algo.confDefault); 621 if (codecClassName == null) { 622 throw new RuntimeException("No codec configured for " + algo.confKey); 623 } 624 Class<?> codecClass = getClassLoaderForCodec().loadClass(codecClassName); 625 // The class is from hadoop so we use hadoop's ReflectionUtils to create it 626 CompressionCodec codec = 627 (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration(conf)); 628 LOG.info("Loaded codec {} for compression algorithm {}", codec.getClass().getCanonicalName(), 629 algo.name()); 630 return codec; 631 } catch (ClassNotFoundException e) { 632 throw new RuntimeException(e); 633 } 634 } 635 636 public static void main(String[] args) throws Exception { 637 Configuration conf = HBaseConfiguration.create(); 638 java.util.Map<String, CompressionCodec> implMap = new java.util.HashMap<>(); 639 for (Algorithm algo : Algorithm.class.getEnumConstants()) { 640 try { 641 implMap.put(algo.name(), algo.getCodec(conf)); 642 } catch (Exception e) { 643 // Ignore failures to load codec native implementations while building the report. 644 // We are to report what is configured. 645 } 646 } 647 for (Algorithm algo : Algorithm.class.getEnumConstants()) { 648 System.out.println(algo.name() + ":"); 649 System.out.println(" name: " + algo.getName()); 650 System.out.println(" confKey: " + algo.confKey); 651 System.out.println(" confDefault: " + algo.confDefault); 652 CompressionCodec codec = implMap.get(algo.name()); 653 System.out.println( 654 " implClass: " + (codec != null ? codec.getClass().getCanonicalName() : "<none>")); 655 } 656 } 657 658}