001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import java.io.BufferedInputStream; 021import java.io.BufferedOutputStream; 022import java.io.FilterOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.io.compress.CompressionCodec; 030import org.apache.hadoop.io.compress.CompressionInputStream; 031import org.apache.hadoop.io.compress.CompressionOutputStream; 032import org.apache.hadoop.io.compress.Compressor; 033import org.apache.hadoop.io.compress.Decompressor; 034import org.apache.hadoop.io.compress.DoNotPool; 035import org.apache.hadoop.util.ReflectionUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Compression related stuff. Copied from hadoop-3315 tfile. 042 */ 043@InterfaceAudience.Private 044public final class Compression { 045 private static final Logger LOG = LoggerFactory.getLogger(Compression.class); 046 047 // LZO 048 049 public static final String LZO_CODEC_CLASS_KEY = "hbase.io.compress.lzo.codec"; 050 public static final String LZO_CODEC_CLASS_DEFAULT = "com.hadoop.compression.lzo.LzoCodec"; 051 052 // GZ 053 054 public static final String GZ_CODEC_CLASS_KEY = "hbase.io.compress.gz.codec"; 055 // Our ReusableStreamGzipCodec fixes an inefficiency in Hadoop's Gzip codec, allowing us to 056 // reuse compression streams, but still requires the Hadoop native codec. 057 public static final String GZ_CODEC_CLASS_DEFAULT = 058 "org.apache.hadoop.hbase.io.compress.ReusableStreamGzipCodec"; 059 060 // SNAPPY 061 062 public static final String SNAPPY_CODEC_CLASS_KEY = "hbase.io.compress.snappy.codec"; 063 public static final String SNAPPY_CODEC_CLASS_DEFAULT = 064 "org.apache.hadoop.io.compress.SnappyCodec"; 065 066 // LZ4 067 068 public static final String LZ4_CODEC_CLASS_KEY = "hbase.io.compress.lz4.codec"; 069 public static final String LZ4_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.Lz4Codec"; 070 071 // ZSTD 072 073 public static final String ZSTD_CODEC_CLASS_KEY = "hbase.io.compress.zstd.codec"; 074 public static final String ZSTD_CODEC_CLASS_DEFAULT = 075 "org.apache.hadoop.io.compress.ZStandardCodec"; 076 077 // BZIP2 078 079 public static final String BZIP2_CODEC_CLASS_KEY = "hbase.io.compress.bzip2.codec"; 080 public static final String BZIP2_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.BZip2Codec"; 081 082 // LZMA 083 084 public static final String LZMA_CODEC_CLASS_KEY = "hbase.io.compress.lzma.codec"; 085 public static final String LZMA_CODEC_CLASS_DEFAULT = 086 "org.apache.hadoop.hbase.io.compress.xz.LzmaCodec"; 087 088 // Brotli 089 090 public static final String BROTLI_CODEC_CLASS_KEY = "hbase.io.compress.brotli.codec"; 091 public static final String BROTLI_CODEC_CLASS_DEFAULT = 092 "org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec"; 093 094 /** 095 * Prevent the instantiation of class. 096 */ 097 private Compression() { 098 super(); 099 } 100 101 static class FinishOnFlushCompressionStream extends FilterOutputStream { 102 public FinishOnFlushCompressionStream(CompressionOutputStream cout) { 103 super(cout); 104 } 105 106 @Override 107 public void write(byte b[], int off, int len) throws IOException { 108 out.write(b, off, len); 109 } 110 111 @Override 112 public void flush() throws IOException { 113 CompressionOutputStream cout = (CompressionOutputStream) out; 114 cout.finish(); 115 cout.flush(); 116 cout.resetState(); 117 } 118 } 119 120 /** 121 * Returns the classloader to load the Codec class from. 122 */ 123 private static ClassLoader getClassLoaderForCodec() { 124 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 125 if (cl == null) { 126 cl = Compression.class.getClassLoader(); 127 } 128 if (cl == null) { 129 cl = ClassLoader.getSystemClassLoader(); 130 } 131 if (cl == null) { 132 throw new RuntimeException("A ClassLoader to load the Codec could not be determined"); 133 } 134 return cl; 135 } 136 137 /** 138 * Compression algorithms. The ordinal of these cannot change or else you risk breaking all 139 * existing HFiles out there. Even the ones that are not compressed! (They use the NONE algorithm) 140 */ 141 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", 142 justification = "We are not serializing so doesn't apply (not sure why transient though)") 143 @SuppressWarnings("ImmutableEnumChecker") 144 @InterfaceAudience.Public 145 public static enum Algorithm { 146 // LZO is GPL and requires extra install to setup. See 147 // https://stackoverflow.com/questions/23441142/class-com-hadoop-compression-lzo-lzocodec-not-found-for-spark-on-cdh-5 148 LZO("lzo", LZO_CODEC_CLASS_KEY, LZO_CODEC_CLASS_DEFAULT) { 149 // Use base type to avoid compile-time dependencies. 150 private volatile transient CompressionCodec lzoCodec; 151 private final transient Object lock = new Object(); 152 153 @Override 154 CompressionCodec getCodec(Configuration conf) { 155 if (lzoCodec == null) { 156 synchronized (lock) { 157 if (lzoCodec == null) { 158 lzoCodec = buildCodec(conf, this); 159 } 160 } 161 } 162 return lzoCodec; 163 } 164 165 @Override 166 public CompressionCodec reload(Configuration conf) { 167 synchronized (lock) { 168 lzoCodec = buildCodec(conf, this); 169 LOG.warn("Reloaded configuration for {}", name()); 170 return lzoCodec; 171 } 172 } 173 }, 174 175 GZ("gz", GZ_CODEC_CLASS_KEY, GZ_CODEC_CLASS_DEFAULT) { 176 private volatile transient CompressionCodec gzCodec; 177 private final transient Object lock = new Object(); 178 179 @Override 180 CompressionCodec getCodec(Configuration conf) { 181 if (gzCodec == null) { 182 synchronized (lock) { 183 if (gzCodec == null) { 184 gzCodec = buildCodec(conf, this); 185 } 186 } 187 } 188 return gzCodec; 189 } 190 191 @Override 192 public CompressionCodec reload(Configuration conf) { 193 synchronized (lock) { 194 gzCodec = buildCodec(conf, this); 195 LOG.warn("Reloaded configuration for {}", name()); 196 return gzCodec; 197 } 198 } 199 }, 200 201 NONE("none", "", "") { 202 @Override 203 CompressionCodec getCodec(Configuration conf) { 204 return null; 205 } 206 207 @Override 208 public CompressionCodec reload(Configuration conf) { 209 return null; 210 } 211 212 @Override 213 public synchronized InputStream createDecompressionStream(InputStream downStream, 214 Decompressor decompressor, int downStreamBufferSize) throws IOException { 215 if (downStreamBufferSize > 0) { 216 return new BufferedInputStream(downStream, downStreamBufferSize); 217 } 218 return downStream; 219 } 220 221 @Override 222 public synchronized OutputStream createCompressionStream(OutputStream downStream, 223 Compressor compressor, int downStreamBufferSize) throws IOException { 224 if (downStreamBufferSize > 0) { 225 return new BufferedOutputStream(downStream, downStreamBufferSize); 226 } 227 228 return downStream; 229 } 230 }, 231 SNAPPY("snappy", SNAPPY_CODEC_CLASS_KEY, SNAPPY_CODEC_CLASS_DEFAULT) { 232 // Use base type to avoid compile-time dependencies. 233 private volatile transient CompressionCodec snappyCodec; 234 private final transient Object lock = new Object(); 235 236 @Override 237 CompressionCodec getCodec(Configuration conf) { 238 if (snappyCodec == null) { 239 synchronized (lock) { 240 if (snappyCodec == null) { 241 snappyCodec = buildCodec(conf, this); 242 } 243 } 244 } 245 return snappyCodec; 246 } 247 248 @Override 249 public CompressionCodec reload(Configuration conf) { 250 synchronized (lock) { 251 snappyCodec = buildCodec(conf, this); 252 LOG.warn("Reloaded configuration for {}", name()); 253 return snappyCodec; 254 } 255 } 256 }, 257 LZ4("lz4", LZ4_CODEC_CLASS_KEY, LZ4_CODEC_CLASS_DEFAULT) { 258 // Use base type to avoid compile-time dependencies. 259 private volatile transient CompressionCodec lz4Codec; 260 private final transient Object lock = new Object(); 261 262 @Override 263 CompressionCodec getCodec(Configuration conf) { 264 if (lz4Codec == null) { 265 synchronized (lock) { 266 if (lz4Codec == null) { 267 lz4Codec = buildCodec(conf, this); 268 } 269 } 270 } 271 return lz4Codec; 272 } 273 274 @Override 275 public CompressionCodec reload(Configuration conf) { 276 synchronized (lock) { 277 lz4Codec = buildCodec(conf, this); 278 LOG.warn("Reloaded configuration for {}", name()); 279 return lz4Codec; 280 } 281 } 282 }, 283 BZIP2("bzip2", BZIP2_CODEC_CLASS_KEY, BZIP2_CODEC_CLASS_DEFAULT) { 284 // Use base type to avoid compile-time dependencies. 285 private volatile transient CompressionCodec bzipCodec; 286 private final transient Object lock = new Object(); 287 288 @Override 289 CompressionCodec getCodec(Configuration conf) { 290 if (bzipCodec == null) { 291 synchronized (lock) { 292 if (bzipCodec == null) { 293 bzipCodec = buildCodec(conf, this); 294 } 295 } 296 } 297 return bzipCodec; 298 } 299 300 @Override 301 public CompressionCodec reload(Configuration conf) { 302 synchronized (lock) { 303 bzipCodec = buildCodec(conf, this); 304 LOG.warn("Reloaded configuration for {}", name()); 305 return bzipCodec; 306 } 307 } 308 }, 309 ZSTD("zstd", ZSTD_CODEC_CLASS_KEY, ZSTD_CODEC_CLASS_DEFAULT) { 310 // Use base type to avoid compile-time dependencies. 311 private volatile transient CompressionCodec zStandardCodec; 312 private final transient Object lock = new Object(); 313 314 @Override 315 CompressionCodec getCodec(Configuration conf) { 316 if (zStandardCodec == null) { 317 synchronized (lock) { 318 if (zStandardCodec == null) { 319 zStandardCodec = buildCodec(conf, this); 320 } 321 } 322 } 323 return zStandardCodec; 324 } 325 326 @Override 327 public CompressionCodec reload(Configuration conf) { 328 synchronized (lock) { 329 zStandardCodec = buildCodec(conf, this); 330 LOG.warn("Reloaded configuration for {}", name()); 331 return zStandardCodec; 332 } 333 } 334 }, 335 LZMA("lzma", LZMA_CODEC_CLASS_KEY, LZMA_CODEC_CLASS_DEFAULT) { 336 // Use base type to avoid compile-time dependencies. 337 private volatile transient CompressionCodec lzmaCodec; 338 private final transient Object lock = new Object(); 339 340 @Override 341 CompressionCodec getCodec(Configuration conf) { 342 if (lzmaCodec == null) { 343 synchronized (lock) { 344 if (lzmaCodec == null) { 345 lzmaCodec = buildCodec(conf, this); 346 } 347 } 348 } 349 return lzmaCodec; 350 } 351 352 @Override 353 public CompressionCodec reload(Configuration conf) { 354 synchronized (lock) { 355 lzmaCodec = buildCodec(conf, this); 356 LOG.warn("Reloaded configuration for {}", name()); 357 return lzmaCodec; 358 } 359 } 360 }, 361 362 BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) { 363 // Use base type to avoid compile-time dependencies. 364 private volatile transient CompressionCodec brotliCodec; 365 private final transient Object lock = new Object(); 366 367 @Override 368 CompressionCodec getCodec(Configuration conf) { 369 if (brotliCodec == null) { 370 synchronized (lock) { 371 if (brotliCodec == null) { 372 brotliCodec = buildCodec(conf, this); 373 } 374 } 375 } 376 return brotliCodec; 377 } 378 379 @Override 380 public CompressionCodec reload(Configuration conf) { 381 synchronized (lock) { 382 brotliCodec = buildCodec(conf, this); 383 LOG.warn("Reloaded configuration for {}", name()); 384 return brotliCodec; 385 } 386 } 387 }; 388 389 private final Configuration conf; 390 private final String compressName; 391 private final String confKey; 392 private final String confDefault; 393 /** data input buffer size to absorb small reads from application. */ 394 private static final int DATA_IBUF_SIZE = 1 * 1024; 395 /** data output buffer size to absorb small writes from application. */ 396 private static final int DATA_OBUF_SIZE = 4 * 1024; 397 398 Algorithm(String name, String confKey, String confDefault) { 399 this.conf = HBaseConfiguration.create(); 400 this.conf.setBoolean("io.native.lib.available", true); 401 this.compressName = name; 402 this.confKey = confKey; 403 this.confDefault = confDefault; 404 } 405 406 abstract CompressionCodec getCodec(Configuration conf); 407 408 /** 409 * Reload configuration for the given algorithm. 410 * <p> 411 * NOTE: Experts only. This can only be done safely during process startup, before the 412 * algorithm's codecs are in use. If the codec implementation is changed, the new implementation 413 * may not be fully compatible with what was loaded at static initialization time, leading to 414 * potential data corruption. Mostly used by unit tests. 415 * @param conf configuration 416 */ 417 public abstract CompressionCodec reload(Configuration conf); 418 419 public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, 420 int downStreamBufferSize) throws IOException { 421 CompressionCodec codec = getCodec(conf); 422 // Set the internal buffer size to read from down stream. 423 if (downStreamBufferSize > 0) { 424 ((Configurable) codec).getConf().setInt("io.file.buffer.size", downStreamBufferSize); 425 } 426 CompressionInputStream cis = codec.createInputStream(downStream, decompressor); 427 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); 428 return bis2; 429 430 } 431 432 public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, 433 int downStreamBufferSize) throws IOException { 434 OutputStream bos1 = null; 435 if (downStreamBufferSize > 0) { 436 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); 437 } else { 438 bos1 = downStream; 439 } 440 CompressionOutputStream cos = createPlainCompressionStream(bos1, compressor); 441 BufferedOutputStream bos2 = 442 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); 443 return bos2; 444 } 445 446 /** 447 * Creates a compression stream without any additional wrapping into buffering streams. 448 */ 449 public CompressionOutputStream createPlainCompressionStream(OutputStream downStream, 450 Compressor compressor) throws IOException { 451 CompressionCodec codec = getCodec(conf); 452 ((Configurable) codec).getConf().setInt("io.file.buffer.size", 32 * 1024); 453 return codec.createOutputStream(downStream, compressor); 454 } 455 456 public Compressor getCompressor() { 457 CompressionCodec codec = getCodec(conf); 458 if (codec != null) { 459 Compressor compressor = CodecPool.getCompressor(codec); 460 if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool."); 461 if (compressor != null) { 462 if (compressor.finished()) { 463 // Somebody returns the compressor to CodecPool but is still using it. 464 LOG.warn("Compressor obtained from CodecPool is already finished()"); 465 } 466 compressor.reset(); 467 } 468 return compressor; 469 } 470 return null; 471 } 472 473 public void returnCompressor(Compressor compressor) { 474 if (compressor != null) { 475 if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool."); 476 CodecPool.returnCompressor(compressor); 477 } 478 } 479 480 public Decompressor getDecompressor() { 481 CompressionCodec codec = getCodec(conf); 482 if (codec != null) { 483 Decompressor decompressor = CodecPool.getDecompressor(codec); 484 if (LOG.isTraceEnabled()) 485 LOG.trace("Retrieved decompressor " + decompressor + " from pool."); 486 if (decompressor != null) { 487 if (decompressor.finished()) { 488 // Somebody returns the decompressor to CodecPool but is still using it. 489 LOG.warn("Decompressor {} obtained from CodecPool is already finished", decompressor); 490 } 491 decompressor.reset(); 492 } 493 return decompressor; 494 } 495 496 return null; 497 } 498 499 public void returnDecompressor(Decompressor decompressor) { 500 if (decompressor != null) { 501 if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool."); 502 CodecPool.returnDecompressor(decompressor); 503 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 504 if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor); 505 decompressor.end(); 506 } 507 } 508 } 509 510 public String getName() { 511 return compressName; 512 } 513 } 514 515 public static Algorithm getCompressionAlgorithmByName(String compressName) { 516 Algorithm[] algos = Algorithm.class.getEnumConstants(); 517 518 for (Algorithm a : algos) { 519 if (a.getName().equals(compressName)) { 520 return a; 521 } 522 } 523 524 throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName); 525 } 526 527 /** 528 * Get names of supported compression algorithms. 529 * @return Array of strings, each represents a supported compression algorithm. Currently, the 530 * following compression algorithms are supported. 531 */ 532 public static String[] getSupportedAlgorithms() { 533 Algorithm[] algos = Algorithm.class.getEnumConstants(); 534 535 String[] ret = new String[algos.length]; 536 int i = 0; 537 for (Algorithm a : algos) { 538 ret[i++] = a.getName(); 539 } 540 541 return ret; 542 } 543 544 /** 545 * Load a codec implementation for an algorithm using the supplied configuration. 546 * @param conf the configuration to use 547 * @param algo the algorithm to implement 548 */ 549 private static CompressionCodec buildCodec(final Configuration conf, final Algorithm algo) { 550 try { 551 String codecClassName = conf.get(algo.confKey, algo.confDefault); 552 if (codecClassName == null) { 553 throw new RuntimeException("No codec configured for " + algo.confKey); 554 } 555 Class<?> codecClass = getClassLoaderForCodec().loadClass(codecClassName); 556 // The class is from hadoop so we use hadoop's ReflectionUtils to create it 557 CompressionCodec codec = 558 (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration(conf)); 559 LOG.info("Loaded codec {} for compression algorithm {}", codec.getClass().getCanonicalName(), 560 algo.name()); 561 return codec; 562 } catch (ClassNotFoundException e) { 563 throw new RuntimeException(e); 564 } 565 } 566 567 public static void main(String[] args) throws Exception { 568 Configuration conf = HBaseConfiguration.create(); 569 java.util.Map<String, CompressionCodec> implMap = new java.util.HashMap<>(); 570 for (Algorithm algo : Algorithm.class.getEnumConstants()) { 571 try { 572 implMap.put(algo.name(), algo.getCodec(conf)); 573 } catch (Exception e) { 574 // Ignore failures to load codec native implementations while building the report. 575 // We are to report what is configured. 576 } 577 } 578 for (Algorithm algo : Algorithm.class.getEnumConstants()) { 579 System.out.println(algo.name() + ":"); 580 System.out.println(" name: " + algo.getName()); 581 System.out.println(" confKey: " + algo.confKey); 582 System.out.println(" confDefault: " + algo.confDefault); 583 CompressionCodec codec = implMap.get(algo.name()); 584 System.out.println( 585 " implClass: " + (codec != null ? codec.getClass().getCanonicalName() : "<none>")); 586 } 587 } 588 589}