001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import com.github.benmanes.caffeine.cache.Caffeine;
021import com.github.benmanes.caffeine.cache.LoadingCache;
022import edu.umd.cs.findbugs.annotations.Nullable;
023import java.util.Comparator;
024import java.util.NavigableSet;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.ConcurrentSkipListSet;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.io.compress.CompressionCodec;
032import org.apache.hadoop.io.compress.Compressor;
033import org.apache.hadoop.io.compress.Decompressor;
034import org.apache.hadoop.io.compress.DoNotPool;
035import org.apache.hadoop.util.ReflectionUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A global compressor/decompressor pool used to save and reuse (possibly native)
042 * compression/decompression codecs. Copied from the class of the same name in hadoop-common and
043 * augmented to improve borrow/return performance.
044 */
045@InterfaceAudience.Private
046public class CodecPool {
047  private static final Logger LOG = LoggerFactory.getLogger(CodecPool.class);
048
049  private static final ConcurrentMap<Class<Compressor>, NavigableSet<Compressor>> COMPRESSOR_POOL =
050    new ConcurrentHashMap<>();
051
052  private static final ConcurrentMap<Class<Decompressor>,
053    NavigableSet<Decompressor>> DECOMPRESSOR_POOL = new ConcurrentHashMap<>();
054
055  private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() {
056    return Caffeine.newBuilder().build(key -> new AtomicInteger());
057  }
058
059  /**
060   * Map to track the number of leased compressors. Only used in unit tests, kept null otherwise.
061   */
062  @Nullable
063  private static LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = null;
064
065  /**
066   * Map to tracks the number of leased decompressors. Only used in unit tests, kept null otherwise.
067   */
068  @Nullable
069  private static LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = null;
070
071  /**
072   * Call if you want lease counting to be enabled. Only used in unit tests.
073   */
074  static void initLeaseCounting() {
075    compressorCounts = createCache();
076    decompressorCounts = createCache();
077  }
078
079  private static <T> T borrow(ConcurrentMap<Class<T>, NavigableSet<T>> pool,
080    Class<? extends T> codecClass) {
081    if (codecClass == null) {
082      return null;
083    }
084
085    NavigableSet<T> codecSet = pool.get(codecClass);
086    if (codecSet != null) {
087      // If a copy of the codec is available, pollFirst() will grab one.
088      // If not, it will return null.
089      return codecSet.pollFirst();
090    } else {
091      return null;
092    }
093  }
094
095  private static <T> boolean payback(ConcurrentMap<Class<T>, NavigableSet<T>> pool, T codec) {
096    if (codec != null) {
097      Class<T> codecClass = ReflectionUtils.getClass(codec);
098      Set<T> codecSet = pool.computeIfAbsent(codecClass,
099        k -> new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode)));
100      return codecSet.add(codec);
101    }
102    return false;
103  }
104
105  /**
106   * Copied from hadoop-common without significant modification.
107   */
108  @SuppressWarnings("unchecked")
109  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
110      value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
111      justification = "LoadingCache will compute value if absent")
112  private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
113    Class<? extends T> codecClass) {
114    return usageCounts.get((Class<T>) codecClass).get();
115  }
116
117  /**
118   * Copied from hadoop-common without significant modification.
119   */
120  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
121      value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
122      justification = "LoadingCache will compute value if absent")
123  private static <T> void updateLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
124    T codec, int delta) {
125    if (codec != null && usageCounts != null) {
126      Class<T> codecClass = ReflectionUtils.getClass(codec);
127      usageCounts.get(codecClass).addAndGet(delta);
128    }
129  }
130
131  /**
132   * Get a {@link Compressor} for the given {@link CompressionCodec} from the pool, or get a new one
133   * if the pool is empty. Copied from hadoop-common without significant modification.
134   */
135  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
136    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
137    if (compressor == null) {
138      compressor = codec.createCompressor();
139      LOG.info("Got brand-new compressor [" + codec.getDefaultExtension() + "]");
140    } else {
141      compressor.reinit(conf);
142      if (LOG.isDebugEnabled()) {
143        LOG.debug("Got recycled compressor");
144      }
145    }
146    if (compressor != null && !compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
147      updateLeaseCount(compressorCounts, compressor, 1);
148    }
149    return compressor;
150  }
151
152  public static Compressor getCompressor(CompressionCodec codec) {
153    return getCompressor(codec, null);
154  }
155
156  /**
157   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the pool, or get a new
158   * one if the pool is empty. Copied from hadoop-common without significant modification.
159   */
160  public static Decompressor getDecompressor(CompressionCodec codec) {
161    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec.getDecompressorType());
162    if (decompressor == null) {
163      decompressor = codec.createDecompressor();
164      LOG.info("Got brand-new decompressor [" + codec.getDefaultExtension() + "]");
165    } else {
166      if (LOG.isDebugEnabled()) {
167        LOG.debug("Got recycled decompressor");
168      }
169    }
170    if (decompressor != null && !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
171      updateLeaseCount(decompressorCounts, decompressor, 1);
172    }
173    return decompressor;
174  }
175
176  /**
177   * Return the {@link Compressor} to the pool. Copied from hadoop-common without significant
178   * modification.
179   */
180  public static void returnCompressor(Compressor compressor) {
181    if (compressor == null) {
182      return;
183    }
184    // if the compressor can't be reused, don't pool it.
185    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
186      compressor.end();
187      return;
188    }
189    compressor.reset();
190    if (payback(COMPRESSOR_POOL, compressor)) {
191      updateLeaseCount(compressorCounts, compressor, -1);
192    }
193  }
194
195  /**
196   * Return the {@link Decompressor} to the pool. Copied from hadoop-common without significant
197   * modification.
198   */
199  public static void returnDecompressor(Decompressor decompressor) {
200    if (decompressor == null) {
201      return;
202    }
203    // if the decompressor can't be reused, don't pool it.
204    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
205      decompressor.end();
206      return;
207    }
208    decompressor.reset();
209    if (payback(DECOMPRESSOR_POOL, decompressor)) {
210      updateLeaseCount(decompressorCounts, decompressor, -1);
211    }
212  }
213
214  /**
215   * Returns the number of leased {@link Compressor}s for this {@link CompressionCodec}. Copied from
216   * hadoop-common without significant modification.
217   */
218  static int getLeasedCompressorsCount(@Nullable CompressionCodec codec) {
219    if (compressorCounts == null) {
220      throw new IllegalStateException("initLeaseCounting() not called to set up lease counting");
221    }
222    return (codec == null) ? 0 : getLeaseCount(compressorCounts, codec.getCompressorType());
223  }
224
225  /**
226   * Returns the number of leased {@link Decompressor}s for this {@link CompressionCodec}. Copied
227   * from hadoop-common without significant modification.
228   */
229  static int getLeasedDecompressorsCount(@Nullable CompressionCodec codec) {
230    if (decompressorCounts == null) {
231      throw new IllegalStateException("initLeaseCounting() not called to set up lease counting");
232    }
233    return (codec == null) ? 0 : getLeaseCount(decompressorCounts, codec.getDecompressorType());
234  }
235}