001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import com.github.benmanes.caffeine.cache.Caffeine; 021import com.github.benmanes.caffeine.cache.LoadingCache; 022import edu.umd.cs.findbugs.annotations.Nullable; 023import java.util.Comparator; 024import java.util.NavigableSet; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ConcurrentSkipListSet; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.io.compress.CompressionCodec; 032import org.apache.hadoop.io.compress.Compressor; 033import org.apache.hadoop.io.compress.Decompressor; 034import org.apache.hadoop.io.compress.DoNotPool; 035import org.apache.hadoop.util.ReflectionUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A global compressor/decompressor pool used to save and reuse (possibly native) 042 * compression/decompression codecs. Copied from the class of the same name in hadoop-common and 043 * augmented to improve borrow/return performance. 044 */ 045@InterfaceAudience.Private 046public class CodecPool { 047 private static final Logger LOG = LoggerFactory.getLogger(CodecPool.class); 048 049 private static final ConcurrentMap<Class<Compressor>, NavigableSet<Compressor>> COMPRESSOR_POOL = 050 new ConcurrentHashMap<>(); 051 052 private static final ConcurrentMap<Class<Decompressor>, 053 NavigableSet<Decompressor>> DECOMPRESSOR_POOL = new ConcurrentHashMap<>(); 054 055 private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() { 056 return Caffeine.newBuilder().build(key -> new AtomicInteger()); 057 } 058 059 /** 060 * Map to track the number of leased compressors. Only used in unit tests, kept null otherwise. 061 */ 062 @Nullable 063 private static LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = null; 064 065 /** 066 * Map to tracks the number of leased decompressors. Only used in unit tests, kept null otherwise. 067 */ 068 @Nullable 069 private static LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = null; 070 071 /** 072 * Call if you want lease counting to be enabled. Only used in unit tests. 073 */ 074 static void initLeaseCounting() { 075 compressorCounts = createCache(); 076 decompressorCounts = createCache(); 077 } 078 079 private static <T> T borrow(ConcurrentMap<Class<T>, NavigableSet<T>> pool, 080 Class<? extends T> codecClass) { 081 if (codecClass == null) { 082 return null; 083 } 084 085 NavigableSet<T> codecSet = pool.get(codecClass); 086 if (codecSet != null) { 087 // If a copy of the codec is available, pollFirst() will grab one. 088 // If not, it will return null. 089 return codecSet.pollFirst(); 090 } else { 091 return null; 092 } 093 } 094 095 private static <T> boolean payback(ConcurrentMap<Class<T>, NavigableSet<T>> pool, T codec) { 096 if (codec != null) { 097 Class<T> codecClass = ReflectionUtils.getClass(codec); 098 Set<T> codecSet = pool.computeIfAbsent(codecClass, 099 k -> new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode))); 100 return codecSet.add(codec); 101 } 102 return false; 103 } 104 105 /** 106 * Copied from hadoop-common without significant modification. 107 */ 108 @SuppressWarnings("unchecked") 109 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 110 value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", 111 justification = "LoadingCache will compute value if absent") 112 private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts, 113 Class<? extends T> codecClass) { 114 return usageCounts.get((Class<T>) codecClass).get(); 115 } 116 117 /** 118 * Copied from hadoop-common without significant modification. 119 */ 120 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 121 value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", 122 justification = "LoadingCache will compute value if absent") 123 private static <T> void updateLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts, 124 T codec, int delta) { 125 if (codec != null && usageCounts != null) { 126 Class<T> codecClass = ReflectionUtils.getClass(codec); 127 usageCounts.get(codecClass).addAndGet(delta); 128 } 129 } 130 131 /** 132 * Get a {@link Compressor} for the given {@link CompressionCodec} from the pool, or get a new one 133 * if the pool is empty. Copied from hadoop-common without significant modification. 134 */ 135 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { 136 Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType()); 137 if (compressor == null) { 138 compressor = codec.createCompressor(); 139 LOG.info("Got brand-new compressor [" + codec.getDefaultExtension() + "]"); 140 } else { 141 compressor.reinit(conf); 142 if (LOG.isDebugEnabled()) { 143 LOG.debug("Got recycled compressor"); 144 } 145 } 146 if (compressor != null && !compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 147 updateLeaseCount(compressorCounts, compressor, 1); 148 } 149 return compressor; 150 } 151 152 public static Compressor getCompressor(CompressionCodec codec) { 153 return getCompressor(codec, null); 154 } 155 156 /** 157 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the pool, or get a new 158 * one if the pool is empty. Copied from hadoop-common without significant modification. 159 */ 160 public static Decompressor getDecompressor(CompressionCodec codec) { 161 Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec.getDecompressorType()); 162 if (decompressor == null) { 163 decompressor = codec.createDecompressor(); 164 LOG.info("Got brand-new decompressor [" + codec.getDefaultExtension() + "]"); 165 } else { 166 if (LOG.isDebugEnabled()) { 167 LOG.debug("Got recycled decompressor"); 168 } 169 } 170 if (decompressor != null && !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 171 updateLeaseCount(decompressorCounts, decompressor, 1); 172 } 173 return decompressor; 174 } 175 176 /** 177 * Return the {@link Compressor} to the pool. Copied from hadoop-common without significant 178 * modification. 179 */ 180 public static void returnCompressor(Compressor compressor) { 181 if (compressor == null) { 182 return; 183 } 184 // if the compressor can't be reused, don't pool it. 185 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 186 compressor.end(); 187 return; 188 } 189 compressor.reset(); 190 if (payback(COMPRESSOR_POOL, compressor)) { 191 updateLeaseCount(compressorCounts, compressor, -1); 192 } 193 } 194 195 /** 196 * Return the {@link Decompressor} to the pool. Copied from hadoop-common without significant 197 * modification. 198 */ 199 public static void returnDecompressor(Decompressor decompressor) { 200 if (decompressor == null) { 201 return; 202 } 203 // if the decompressor can't be reused, don't pool it. 204 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 205 decompressor.end(); 206 return; 207 } 208 decompressor.reset(); 209 if (payback(DECOMPRESSOR_POOL, decompressor)) { 210 updateLeaseCount(decompressorCounts, decompressor, -1); 211 } 212 } 213 214 /** 215 * Returns the number of leased {@link Compressor}s for this {@link CompressionCodec}. Copied from 216 * hadoop-common without significant modification. 217 */ 218 static int getLeasedCompressorsCount(@Nullable CompressionCodec codec) { 219 if (compressorCounts == null) { 220 throw new IllegalStateException("initLeaseCounting() not called to set up lease counting"); 221 } 222 return (codec == null) ? 0 : getLeaseCount(compressorCounts, codec.getCompressorType()); 223 } 224 225 /** 226 * Returns the number of leased {@link Decompressor}s for this {@link CompressionCodec}. Copied 227 * from hadoop-common without significant modification. 228 */ 229 static int getLeasedDecompressorsCount(@Nullable CompressionCodec codec) { 230 if (decompressorCounts == null) { 231 throw new IllegalStateException("initLeaseCounting() not called to set up lease counting"); 232 } 233 return (codec == null) ? 0 : getLeaseCount(decompressorCounts, codec.getDecompressorType()); 234 } 235}