001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
021
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.FileOutputStream;
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.NavigableSet;
035import java.util.Optional;
036import java.util.PriorityQueue;
037import java.util.Set;
038import java.util.concurrent.ArrayBlockingQueue;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.ConcurrentSkipListSet;
043import java.util.concurrent.Executors;
044import java.util.concurrent.ScheduledExecutorService;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.concurrent.atomic.LongAdder;
049import java.util.concurrent.locks.Lock;
050import java.util.concurrent.locks.ReentrantLock;
051import java.util.concurrent.locks.ReentrantReadWriteLock;
052import java.util.function.Consumer;
053import java.util.function.Function;
054import org.apache.commons.io.IOUtils;
055import org.apache.commons.lang3.mutable.MutableInt;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseIOException;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.client.Admin;
062import org.apache.hadoop.hbase.io.ByteBuffAllocator;
063import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
064import org.apache.hadoop.hbase.io.HeapSize;
065import org.apache.hadoop.hbase.io.hfile.BlockCache;
066import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
067import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
068import org.apache.hadoop.hbase.io.hfile.BlockPriority;
069import org.apache.hadoop.hbase.io.hfile.BlockType;
070import org.apache.hadoop.hbase.io.hfile.CacheConfig;
071import org.apache.hadoop.hbase.io.hfile.CacheStats;
072import org.apache.hadoop.hbase.io.hfile.Cacheable;
073import org.apache.hadoop.hbase.io.hfile.CachedBlock;
074import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
075import org.apache.hadoop.hbase.io.hfile.HFileBlock;
076import org.apache.hadoop.hbase.io.hfile.HFileContext;
077import org.apache.hadoop.hbase.nio.ByteBuff;
078import org.apache.hadoop.hbase.nio.RefCnt;
079import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
080import org.apache.hadoop.hbase.util.Bytes;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.IdReadWriteLock;
083import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
084import org.apache.hadoop.hbase.util.Pair;
085import org.apache.hadoop.util.StringUtils;
086import org.apache.yetus.audience.InterfaceAudience;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
091import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
092
093import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
094
095/**
096 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache
097 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket
098 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap
099 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files
100 * {@link FileIOEngine} to store/read the block data.
101 * <p>
102 * Eviction is via a similar algorithm as used in
103 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
104 * <p>
105 * BucketCache can be used as mainly a block cache (see
106 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to
107 * decrease CMS GC and heap fragmentation.
108 * <p>
109 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to
110 * enlarge cache space via a victim cache.
111 */
112@InterfaceAudience.Private
113public class BucketCache implements BlockCache, HeapSize {
114  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
115
116  /** Priority buckets config */
117  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
118  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
119  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
120  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
121  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
122  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
123
124  /** Priority buckets */
125  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
126  static final float DEFAULT_MULTI_FACTOR = 0.50f;
127  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
128  static final float DEFAULT_MIN_FACTOR = 0.85f;
129
130  private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
131  private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
132
133  // Number of blocks to clear for each of the bucket size that is full
134  private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
135
136  /** Statistics thread */
137  private static final int statThreadPeriod = 5 * 60;
138
139  final static int DEFAULT_WRITER_THREADS = 3;
140  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
141
142  // Store/read block data
143  transient final IOEngine ioEngine;
144
145  // Store the block in this map before writing it to cache
146  transient final RAMCache ramCache;
147
148  // In this map, store the block's meta data like offset, length
149  transient Map<BlockCacheKey, BucketEntry> backingMap;
150
151  private AtomicBoolean backingMapValidated = new AtomicBoolean(false);
152
153  /**
154   * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch,
155   * together with the region those belong to and the total cached size for the
156   * region.TestBlockEvictionOnRegionMovement
157   */
158  final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>();
159  /**
160   * Map of region -> total size of the region prefetched on this region server. This is the total
161   * size of hFiles for this region prefetched on this region server
162   */
163  final Map<String, Long> regionCachedSizeMap = new ConcurrentHashMap<>();
164
165  private BucketCachePersister cachePersister;
166
167  /**
168   * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so
169   * that Bucket IO exceptions/errors don't bring down the HBase server.
170   */
171  private volatile boolean cacheEnabled;
172
173  /**
174   * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other
175   * words, the work adding blocks to the BucketCache is divided up amongst the running
176   * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when
177   * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It
178   * then updates the ramCache and backingMap accordingly.
179   */
180  transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
181  transient final WriterThread[] writerThreads;
182
183  /** Volatile boolean to track if free space is in process or not */
184  private volatile boolean freeInProgress = false;
185  private transient final Lock freeSpaceLock = new ReentrantLock();
186
187  private final LongAdder realCacheSize = new LongAdder();
188  private final LongAdder heapSize = new LongAdder();
189  /** Current number of cached elements */
190  private final LongAdder blockNumber = new LongAdder();
191
192  /** Cache access count (sequential ID) */
193  private final AtomicLong accessCount = new AtomicLong();
194
195  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
196
197  private final BucketCacheStats cacheStats = new BucketCacheStats();
198  private final String persistencePath;
199  static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
200  private final long cacheCapacity;
201  /** Approximate block size */
202  private final long blockSize;
203
204  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
205  private final int ioErrorsTolerationDuration;
206  // 1 min
207  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
208
209  // Start time of first IO error when reading or writing IO Engine, it will be
210  // reset after a successful read/write.
211  private volatile long ioErrorStartTime = -1;
212
213  /**
214   * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of
215   * this is to avoid freeing the block which is being read.
216   * <p>
217   * Key set of offsets in BucketCache is limited so soft reference is the best choice here.
218   */
219  transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT);
220
221  NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>(
222    Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
223
224  /** Statistics thread schedule pool (for heavy debugging, could remove) */
225  private transient final ScheduledExecutorService scheduleThreadPool =
226    Executors.newScheduledThreadPool(1,
227      new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
228
229  // Allocate or free space for the block
230  private transient BucketAllocator bucketAllocator;
231
232  /** Acceptable size of cache (no evictions if size < acceptable) */
233  private float acceptableFactor;
234
235  /** Minimum threshold of cache (when evicting, evict until size < min) */
236  private float minFactor;
237
238  /**
239   * Free this floating point factor of extra blocks when evicting. For example free the number of
240   * blocks requested * (1 + extraFreeFactor)
241   */
242  private float extraFreeFactor;
243
244  /** Single access bucket size */
245  private float singleFactor;
246
247  /** Multiple access bucket size */
248  private float multiFactor;
249
250  /** In-memory bucket size */
251  private float memoryFactor;
252
253  private long bucketcachePersistInterval;
254
255  private static final String FILE_VERIFY_ALGORITHM =
256    "hbase.bucketcache.persistent.file.integrity.check.algorithm";
257  private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
258
259  private static final String QUEUE_ADDITION_WAIT_TIME =
260    "hbase.bucketcache.queue.addition.waittime";
261  private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
262  private long queueAdditionWaitTime;
263  /**
264   * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
265   * integrity, default algorithm is MD5
266   */
267  private String algorithm;
268
269  /* Tracing failed Bucket Cache allocations. */
270  private long allocFailLogPrevTs; // time of previous log event for allocation failure.
271  private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
272
273  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
274    int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
275    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
276      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
277  }
278
279  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
280    int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
281    Configuration conf) throws IOException {
282    Preconditions.checkArgument(blockSize > 0,
283      "BucketCache capacity is set to " + blockSize + ", can not be less than 0");
284    this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
285    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
286    this.writerThreads = new WriterThread[writerThreadNum];
287    long blockNumCapacity = capacity / blockSize;
288    if (blockNumCapacity >= Integer.MAX_VALUE) {
289      // Enough for about 32TB of cache!
290      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
291    }
292
293    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
294    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
295    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
296    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
297    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
298    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
299    this.queueAdditionWaitTime =
300      conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
301    this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
302
303    sanityCheckConfigs();
304
305    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor
306      + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: "
307      + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor);
308
309    this.cacheCapacity = capacity;
310    this.persistencePath = persistencePath;
311    this.blockSize = blockSize;
312    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
313
314    this.allocFailLogPrevTs = 0;
315
316    for (int i = 0; i < writerThreads.length; ++i) {
317      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
318    }
319
320    assert writerQueues.size() == writerThreads.length;
321    this.ramCache = new RAMCache();
322
323    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
324
325    if (isCachePersistent()) {
326      if (ioEngine instanceof FileIOEngine) {
327        startBucketCachePersisterThread();
328      }
329      try {
330        retrieveFromFile(bucketSizes);
331      } catch (IOException ioex) {
332        LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex);
333        backingMap.clear();
334        fullyCachedFiles.clear();
335        backingMapValidated.set(true);
336        bucketAllocator = new BucketAllocator(capacity, bucketSizes);
337        regionCachedSizeMap.clear();
338      }
339    } else {
340      bucketAllocator = new BucketAllocator(capacity, bucketSizes);
341    }
342    final String threadName = Thread.currentThread().getName();
343    this.cacheEnabled = true;
344    for (int i = 0; i < writerThreads.length; ++i) {
345      writerThreads[i] = new WriterThread(writerQueues.get(i));
346      writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
347      writerThreads[i].setDaemon(true);
348    }
349    startWriterThreads();
350
351    // Run the statistics thread periodically to print the cache statistics log
352    // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
353    // every five minutes.
354    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod,
355      statThreadPeriod, TimeUnit.SECONDS);
356    LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity="
357      + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize)
358      + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath="
359      + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
360  }
361
362  private void sanityCheckConfigs() {
363    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0,
364      ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
365    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0,
366      MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
367    Preconditions.checkArgument(minFactor <= acceptableFactor,
368      MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
369    Preconditions.checkArgument(extraFreeFactor >= 0,
370      EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
371    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0,
372      SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
373    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0,
374      MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
375    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0,
376      MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
377    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1,
378      SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and "
379        + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
380  }
381
382  /**
383   * Called by the constructor to start the writer threads. Used by tests that need to override
384   * starting the threads.
385   */
386  protected void startWriterThreads() {
387    for (WriterThread thread : writerThreads) {
388      thread.start();
389    }
390  }
391
392  void startBucketCachePersisterThread() {
393    LOG.info("Starting BucketCachePersisterThread");
394    cachePersister = new BucketCachePersister(this, bucketcachePersistInterval);
395    cachePersister.setDaemon(true);
396    cachePersister.start();
397  }
398
399  boolean isCacheEnabled() {
400    return this.cacheEnabled;
401  }
402
403  @Override
404  public long getMaxSize() {
405    return this.cacheCapacity;
406  }
407
408  public String getIoEngine() {
409    return ioEngine.toString();
410  }
411
412  /**
413   * Get the IOEngine from the IO engine name
414   * @return the IOEngine
415   */
416  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
417    throws IOException {
418    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
419      // In order to make the usage simple, we only need the prefix 'files:' in
420      // document whether one or multiple file(s), but also support 'file:' for
421      // the compatibility
422      String[] filePaths =
423        ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
424      return new FileIOEngine(capacity, persistencePath != null, filePaths);
425    } else if (ioEngineName.startsWith("offheap")) {
426      return new ByteBufferIOEngine(capacity);
427    } else if (ioEngineName.startsWith("mmap:")) {
428      return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
429    } else if (ioEngineName.startsWith("pmem:")) {
430      // This mode of bucket cache creates an IOEngine over a file on the persistent memory
431      // device. Since the persistent memory device has its own address space the contents
432      // mapped to this address space does not get swapped out like in the case of mmapping
433      // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache
434      // can be directly referred to without having to copy them onheap. Once the RPC is done,
435      // the blocks can be returned back as in case of ByteBufferIOEngine.
436      return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity);
437    } else {
438      throw new IllegalArgumentException(
439        "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
440    }
441  }
442
443  public boolean isCachePersistenceEnabled() {
444    return persistencePath != null;
445  }
446
447  /**
448   * Cache the block with the specified name and buffer.
449   * @param cacheKey block's cache key
450   * @param buf      block buffer
451   */
452  @Override
453  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
454    cacheBlock(cacheKey, buf, false);
455  }
456
457  /**
458   * Cache the block with the specified name and buffer.
459   * @param cacheKey   block's cache key
460   * @param cachedItem block buffer
461   * @param inMemory   if block is in-memory
462   */
463  @Override
464  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
465    cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
466  }
467
468  /**
469   * Cache the block with the specified name and buffer.
470   * @param cacheKey   block's cache key
471   * @param cachedItem block buffer
472   * @param inMemory   if block is in-memory
473   */
474  @Override
475  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
476    boolean waitWhenCache) {
477    cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
478  }
479
480  /**
481   * Cache the block to ramCache
482   * @param cacheKey   block's cache key
483   * @param cachedItem block buffer
484   * @param inMemory   if block is in-memory
485   * @param wait       if true, blocking wait when queue is full
486   */
487  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
488    boolean wait) {
489    if (cacheEnabled) {
490      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
491        if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
492          BucketEntry bucketEntry = backingMap.get(cacheKey);
493          if (bucketEntry != null && bucketEntry.isRpcRef()) {
494            // avoid replace when there are RPC refs for the bucket entry in bucket cache
495            return;
496          }
497          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
498        }
499      } else {
500        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
501      }
502    }
503  }
504
505  protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
506    return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
507  }
508
509  protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
510    boolean inMemory, boolean wait) {
511    if (!cacheEnabled) {
512      return;
513    }
514    if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) {
515      cacheKey.setBlockType(cachedItem.getBlockType());
516    }
517    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
518    // Stuff the entry into the RAM cache so it can get drained to the persistent store
519    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(),
520      inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine);
521    /**
522     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
523     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
524     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
525     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
526     * one, then the heap size will mess up (HBASE-20789)
527     */
528    if (ramCache.putIfAbsent(cacheKey, re) != null) {
529      return;
530    }
531    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
532    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
533    boolean successfulAddition = false;
534    if (wait) {
535      try {
536        successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
537      } catch (InterruptedException e) {
538        Thread.currentThread().interrupt();
539      }
540    } else {
541      successfulAddition = bq.offer(re);
542    }
543    if (!successfulAddition) {
544      ramCache.remove(cacheKey);
545      cacheStats.failInsert();
546    } else {
547      this.blockNumber.increment();
548      this.heapSize.add(cachedItem.heapSize());
549    }
550  }
551
552  /**
553   * Get the buffer of the block with the specified key.
554   * @param key                block's cache key
555   * @param caching            true if the caller caches blocks on cache misses
556   * @param repeat             Whether this is a repeat lookup for the same block
557   * @param updateCacheMetrics Whether we should update cache metrics or not
558   * @return buffer of specified cache key, or null if not in cache
559   */
560  @Override
561  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
562    boolean updateCacheMetrics) {
563    if (!cacheEnabled) {
564      return null;
565    }
566    RAMQueueEntry re = ramCache.get(key);
567    if (re != null) {
568      if (updateCacheMetrics) {
569        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
570      }
571      re.access(accessCount.incrementAndGet());
572      return re.getData();
573    }
574    BucketEntry bucketEntry = backingMap.get(key);
575    if (bucketEntry != null) {
576      long start = System.nanoTime();
577      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
578      try {
579        lock.readLock().lock();
580        // We can not read here even if backingMap does contain the given key because its offset
581        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
582        // existence here.
583        if (bucketEntry.equals(backingMap.get(key))) {
584          // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the
585          // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
586          // the same BucketEntry, then all of the three will share the same refCnt.
587          Cacheable cachedBlock = ioEngine.read(bucketEntry);
588          if (ioEngine.usesSharedMemory()) {
589            // If IOEngine use shared memory, cachedBlock and BucketEntry will share the
590            // same RefCnt, do retain here, in order to count the number of RPC references
591            cachedBlock.retain();
592          }
593          // Update the cache statistics.
594          if (updateCacheMetrics) {
595            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
596            cacheStats.ioHit(System.nanoTime() - start);
597          }
598          bucketEntry.access(accessCount.incrementAndGet());
599          if (this.ioErrorStartTime > 0) {
600            ioErrorStartTime = -1;
601          }
602          return cachedBlock;
603        }
604      } catch (HBaseIOException hioex) {
605        // When using file io engine persistent cache,
606        // the cache map state might differ from the actual cache. If we reach this block,
607        // we should remove the cache key entry from the backing map
608        backingMap.remove(key);
609        removeFileFromPrefetch(key.getHfileName());
610        LOG.debug("Failed to fetch block for cache key: {}.", key, hioex);
611      } catch (IOException ioex) {
612        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
613        checkIOErrorIsTolerated();
614      } finally {
615        lock.readLock().unlock();
616      }
617    }
618    if (!repeat && updateCacheMetrics) {
619      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
620    }
621    return null;
622  }
623
624  /**
625   * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
626   */
627  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber,
628    boolean evictedByEvictionProcess) {
629    bucketEntry.markAsEvicted();
630    blocksByHFile.remove(cacheKey);
631    if (decrementBlockNumber) {
632      this.blockNumber.decrement();
633      if (ioEngine.isPersistent()) {
634        removeFileFromPrefetch(cacheKey.getHfileName());
635      }
636    }
637    if (evictedByEvictionProcess) {
638      cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
639    }
640    if (ioEngine.isPersistent()) {
641      setCacheInconsistent(true);
642    }
643  }
644
645  /**
646   * Free the {{@link BucketEntry} actually,which could only be invoked when the
647   * {@link BucketEntry#refCnt} becoming 0.
648   */
649  void freeBucketEntry(BucketEntry bucketEntry) {
650    bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
651    realCacheSize.add(-1 * bucketEntry.getLength());
652  }
653
654  /**
655   * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br>
656   * 1. Close an HFile, and clear all cached blocks. <br>
657   * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
658   * <p>
659   * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
660   * Here we evict the block from backingMap immediately, but only free the reference from bucket
661   * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
662   * block, block can only be de-allocated when all of them release the block.
663   * <p>
664   * NOTICE: we need to grab the write offset lock firstly before releasing the reference from
665   * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
666   * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak.
667   * @param cacheKey Block to evict
668   * @return true to indicate whether we've evicted successfully or not.
669   */
670  @Override
671  public boolean evictBlock(BlockCacheKey cacheKey) {
672    return doEvictBlock(cacheKey, null, false);
673  }
674
675  /**
676   * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
677   * {@link BucketCache#ramCache}. <br/>
678   * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
679   * {@link BucketEntry} could be removed.
680   * @param cacheKey    {@link BlockCacheKey} to evict.
681   * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
682   * @return true to indicate whether we've evicted successfully or not.
683   */
684  private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
685    boolean evictedByEvictionProcess) {
686    if (!cacheEnabled) {
687      return false;
688    }
689    boolean existedInRamCache = removeFromRamCache(cacheKey);
690    if (bucketEntry == null) {
691      bucketEntry = backingMap.get(cacheKey);
692    }
693    final BucketEntry bucketEntryToUse = bucketEntry;
694
695    if (bucketEntryToUse == null) {
696      if (existedInRamCache && evictedByEvictionProcess) {
697        cacheStats.evicted(0, cacheKey.isPrimary());
698      }
699      return existedInRamCache;
700    } else {
701      return bucketEntryToUse.withWriteLock(offsetLock, () -> {
702        if (backingMap.remove(cacheKey, bucketEntryToUse)) {
703          LOG.debug("removed key {} from back map with offset lock {} in the evict process",
704            cacheKey, bucketEntryToUse.offset());
705          blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
706          return true;
707        }
708        return false;
709      });
710    }
711  }
712
713  /**
714   * <pre>
715   * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
716   * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
717   * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
718   * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
719   * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
720   *   it is the return value of current {@link BucketCache#createRecycler} method.
721   *
722   * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
723   *   it is {@link ByteBuffAllocator#putbackBuffer}.
724   * </pre>
725   */
726  private Recycler createRecycler(final BucketEntry bucketEntry) {
727    return () -> {
728      freeBucketEntry(bucketEntry);
729      return;
730    };
731  }
732
733  /**
734   * NOTE: This method is only for test.
735   */
736  public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
737    BucketEntry bucketEntry = backingMap.get(blockCacheKey);
738    if (bucketEntry == null) {
739      return false;
740    }
741    return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
742  }
743
744  /**
745   * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
746   * {@link BucketEntry#isRpcRef} is false. <br/>
747   * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
748   * {@link BucketEntry} could be removed.
749   * @param blockCacheKey {@link BlockCacheKey} to evict.
750   * @param bucketEntry   {@link BucketEntry} matched {@link BlockCacheKey} to evict.
751   * @return true to indicate whether we've evicted successfully or not.
752   */
753  boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
754    if (!bucketEntry.isRpcRef()) {
755      return doEvictBlock(blockCacheKey, bucketEntry, true);
756    }
757    return false;
758  }
759
760  protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
761    return ramCache.remove(cacheKey, re -> {
762      if (re != null) {
763        this.blockNumber.decrement();
764        this.heapSize.add(-1 * re.getData().heapSize());
765      }
766    });
767  }
768
769  public boolean isCacheInconsistent() {
770    return isCacheInconsistent.get();
771  }
772
773  public void setCacheInconsistent(boolean setCacheInconsistent) {
774    isCacheInconsistent.set(setCacheInconsistent);
775  }
776
777  /*
778   * Statistics thread. Periodically output cache statistics to the log.
779   */
780  private static class StatisticsThread extends Thread {
781    private final BucketCache bucketCache;
782
783    public StatisticsThread(BucketCache bucketCache) {
784      super("BucketCacheStatsThread");
785      setDaemon(true);
786      this.bucketCache = bucketCache;
787    }
788
789    @Override
790    public void run() {
791      bucketCache.logStats();
792    }
793  }
794
795  public void logStats() {
796    long totalSize = bucketAllocator.getTotalSize();
797    long usedSize = bucketAllocator.getUsedSize();
798    long freeSize = totalSize - usedSize;
799    long cacheSize = getRealCacheSize();
800    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize="
801      + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", "
802      + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize="
803      + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", "
804      + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond="
805      + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit="
806      + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio="
807      + (cacheStats.getHitCount() == 0
808        ? "0,"
809        : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
810      + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
811      + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
812      + (cacheStats.getHitCachingCount() == 0
813        ? "0,"
814        : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
815      + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
816      + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()
817      + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount());
818    cacheStats.reset();
819
820    bucketAllocator.logDebugStatistics();
821  }
822
823  public long getRealCacheSize() {
824    return this.realCacheSize.sum();
825  }
826
827  public long acceptableSize() {
828    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
829  }
830
831  long getPartitionSize(float partitionFactor) {
832    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
833  }
834
835  /**
836   * Return the count of bucketSizeinfos still need free space
837   */
838  private int bucketSizesAboveThresholdCount(float minFactor) {
839    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
840    int fullCount = 0;
841    for (int i = 0; i < stats.length; i++) {
842      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
843      freeGoal = Math.max(freeGoal, 1);
844      if (stats[i].freeCount() < freeGoal) {
845        fullCount++;
846      }
847    }
848    return fullCount;
849  }
850
851  /**
852   * This method will find the buckets that are minimally occupied and are not reference counted and
853   * will free them completely without any constraint on the access times of the elements, and as a
854   * process will completely free at most the number of buckets passed, sometimes it might not due
855   * to changing refCounts
856   * @param completelyFreeBucketsNeeded number of buckets to free
857   **/
858  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
859    if (completelyFreeBucketsNeeded != 0) {
860      // First we will build a set where the offsets are reference counted, usually
861      // this set is small around O(Handler Count) unless something else is wrong
862      Set<Integer> inUseBuckets = new HashSet<>();
863      backingMap.forEach((k, be) -> {
864        if (be.isRpcRef()) {
865          inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset()));
866        }
867      });
868      Set<Integer> candidateBuckets =
869        bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
870      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
871        if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
872          evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
873        }
874      }
875    }
876  }
877
878  /**
879   * Free the space if the used size reaches acceptableSize() or one size block couldn't be
880   * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some
881   * blocks evicted
882   * @param why Why we are being called
883   */
884  void freeSpace(final String why) {
885    // Ensure only one freeSpace progress at a time
886    if (!freeSpaceLock.tryLock()) {
887      return;
888    }
889    try {
890      freeInProgress = true;
891      long bytesToFreeWithoutExtra = 0;
892      // Calculate free byte for each bucketSizeinfo
893      StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null;
894      BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
895      long[] bytesToFreeForBucket = new long[stats.length];
896      for (int i = 0; i < stats.length; i++) {
897        bytesToFreeForBucket[i] = 0;
898        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
899        freeGoal = Math.max(freeGoal, 1);
900        if (stats[i].freeCount() < freeGoal) {
901          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
902          bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
903          if (msgBuffer != null) {
904            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
905              + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
906          }
907        }
908      }
909      if (msgBuffer != null) {
910        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
911      }
912
913      if (bytesToFreeWithoutExtra <= 0) {
914        return;
915      }
916      long currentSize = bucketAllocator.getUsedSize();
917      long totalSize = bucketAllocator.getTotalSize();
918      if (LOG.isDebugEnabled() && msgBuffer != null) {
919        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString()
920          + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
921          + StringUtils.byteDesc(realCacheSize.sum()) + ", total="
922          + StringUtils.byteDesc(totalSize));
923      }
924
925      long bytesToFreeWithExtra =
926        (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
927
928      // Instantiate priority buckets
929      BucketEntryGroup bucketSingle =
930        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
931      BucketEntryGroup bucketMulti =
932        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor));
933      BucketEntryGroup bucketMemory =
934        new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));
935
936      // Scan entire map putting bucket entry into appropriate bucket entry
937      // group
938      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
939        switch (bucketEntryWithKey.getValue().getPriority()) {
940          case SINGLE: {
941            bucketSingle.add(bucketEntryWithKey);
942            break;
943          }
944          case MULTI: {
945            bucketMulti.add(bucketEntryWithKey);
946            break;
947          }
948          case MEMORY: {
949            bucketMemory.add(bucketEntryWithKey);
950            break;
951          }
952        }
953      }
954
955      PriorityQueue<BucketEntryGroup> bucketQueue =
956        new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
957
958      bucketQueue.add(bucketSingle);
959      bucketQueue.add(bucketMulti);
960      bucketQueue.add(bucketMemory);
961
962      int remainingBuckets = bucketQueue.size();
963      long bytesFreed = 0;
964
965      BucketEntryGroup bucketGroup;
966      while ((bucketGroup = bucketQueue.poll()) != null) {
967        long overflow = bucketGroup.overflow();
968        if (overflow > 0) {
969          long bucketBytesToFree =
970            Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
971          bytesFreed += bucketGroup.free(bucketBytesToFree);
972        }
973        remainingBuckets--;
974      }
975
976      // Check and free if there are buckets that still need freeing of space
977      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
978        bucketQueue.clear();
979        remainingBuckets = 3;
980
981        bucketQueue.add(bucketSingle);
982        bucketQueue.add(bucketMulti);
983        bucketQueue.add(bucketMemory);
984
985        while ((bucketGroup = bucketQueue.poll()) != null) {
986          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
987          bytesFreed += bucketGroup.free(bucketBytesToFree);
988          remainingBuckets--;
989        }
990      }
991
992      // Even after the above free we might still need freeing because of the
993      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
994      // there might be some buckets where the occupancy is very sparse and thus are not
995      // yielding the free for the other bucket sizes, the fix for this to evict some
996      // of the buckets, we do this by evicting the buckets that are least fulled
997      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f));
998
999      if (LOG.isDebugEnabled()) {
1000        long single = bucketSingle.totalSize();
1001        long multi = bucketMulti.totalSize();
1002        long memory = bucketMemory.totalSize();
1003        if (LOG.isDebugEnabled()) {
1004          LOG.debug("Bucket cache free space completed; " + "freed="
1005            + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize)
1006            + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi="
1007            + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory));
1008        }
1009      }
1010
1011    } catch (Throwable t) {
1012      LOG.warn("Failed freeing space", t);
1013    } finally {
1014      cacheStats.evict();
1015      freeInProgress = false;
1016      freeSpaceLock.unlock();
1017    }
1018  }
1019
1020  // This handles flushing the RAM cache to IOEngine.
1021  class WriterThread extends Thread {
1022    private final BlockingQueue<RAMQueueEntry> inputQueue;
1023    private volatile boolean writerEnabled = true;
1024    private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
1025
1026    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
1027      super("BucketCacheWriterThread");
1028      this.inputQueue = queue;
1029    }
1030
1031    // Used for test
1032    void disableWriter() {
1033      this.writerEnabled = false;
1034    }
1035
1036    @Override
1037    public void run() {
1038      List<RAMQueueEntry> entries = new ArrayList<>();
1039      try {
1040        while (cacheEnabled && writerEnabled) {
1041          try {
1042            try {
1043              // Blocks
1044              entries = getRAMQueueEntries(inputQueue, entries);
1045            } catch (InterruptedException ie) {
1046              if (!cacheEnabled || !writerEnabled) {
1047                break;
1048              }
1049            }
1050            doDrain(entries, metaBuff);
1051          } catch (Exception ioe) {
1052            LOG.error("WriterThread encountered error", ioe);
1053          }
1054        }
1055      } catch (Throwable t) {
1056        LOG.warn("Failed doing drain", t);
1057      }
1058      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
1059    }
1060  }
1061
1062  /**
1063   * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
1064   * cache with a new block for the same cache key. there's a corner case: one thread cache a block
1065   * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
1066   * with the same cache key do the same thing for the same cache key, so if not evict the previous
1067   * bucket entry, then memory leak happen because the previous bucketEntry is gone but the
1068   * bucketAllocator do not free its memory.
1069   * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
1070   *      cacheKey, Cacheable newBlock)
1071   * @param key         Block cache key
1072   * @param bucketEntry Bucket entry to put into backingMap.
1073   */
1074  protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
1075    BucketEntry previousEntry = backingMap.put(key, bucketEntry);
1076    blocksByHFile.add(key);
1077    if (previousEntry != null && previousEntry != bucketEntry) {
1078      previousEntry.withWriteLock(offsetLock, () -> {
1079        blockEvicted(key, previousEntry, false, false);
1080        return null;
1081      });
1082    }
1083  }
1084
1085  /**
1086   * Prepare and return a warning message for Bucket Allocator Exception
1087   * @param fle The exception
1088   * @param re  The RAMQueueEntry for which the exception was thrown.
1089   * @return A warning message created from the input RAMQueueEntry object.
1090   */
1091  private static String getAllocationFailWarningMessage(final BucketAllocatorException fle,
1092    final RAMQueueEntry re) {
1093    final StringBuilder sb = new StringBuilder();
1094    sb.append("Most recent failed allocation after ");
1095    sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD);
1096    sb.append(" ms;");
1097    if (re != null) {
1098      if (re.getData() instanceof HFileBlock) {
1099        final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext();
1100        final String columnFamily = Bytes.toString(fileContext.getColumnFamily());
1101        final String tableName = Bytes.toString(fileContext.getTableName());
1102        if (tableName != null) {
1103          sb.append(" Table: ");
1104          sb.append(tableName);
1105        }
1106        if (columnFamily != null) {
1107          sb.append(" CF: ");
1108          sb.append(columnFamily);
1109        }
1110        sb.append(" HFile: ");
1111        if (fileContext.getHFileName() != null) {
1112          sb.append(fileContext.getHFileName());
1113        } else {
1114          sb.append(re.getKey());
1115        }
1116      } else {
1117        sb.append(" HFile: ");
1118        sb.append(re.getKey());
1119      }
1120    }
1121    sb.append(" Message: ");
1122    sb.append(fle.getMessage());
1123    return sb.toString();
1124  }
1125
1126  /**
1127   * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
1128   * are passed in even if failure being sure to remove from ramCache else we'll never undo the
1129   * references and we'll OOME.
1130   * @param entries Presumes list passed in here will be processed by this invocation only. No
1131   *                interference expected.
1132   */
1133  void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
1134    if (entries.isEmpty()) {
1135      return;
1136    }
1137    // This method is a little hard to follow. We run through the passed in entries and for each
1138    // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
1139    // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
1140    // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
1141    // filling ramCache. We do the clean up by again running through the passed in entries
1142    // doing extra work when we find a non-null bucketEntries corresponding entry.
1143    final int size = entries.size();
1144    BucketEntry[] bucketEntries = new BucketEntry[size];
1145    // Index updated inside loop if success or if we can't succeed. We retry if cache is full
1146    // when we go to add an entry by going around the loop again without upping the index.
1147    int index = 0;
1148    while (cacheEnabled && index < size) {
1149      RAMQueueEntry re = null;
1150      try {
1151        re = entries.get(index);
1152        if (re == null) {
1153          LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
1154          index++;
1155          continue;
1156        }
1157        // Reset the position for reuse.
1158        // It should be guaranteed that the data in the metaBuff has been transferred to the
1159        // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
1160        // transferred with our current IOEngines. Should take care, when we have new kinds of
1161        // IOEngine in the future.
1162        metaBuff.clear();
1163        BucketEntry bucketEntry =
1164          re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff);
1165        // Successfully added. Up index and add bucketEntry. Clear io exceptions.
1166        bucketEntries[index] = bucketEntry;
1167        if (ioErrorStartTime > 0) {
1168          ioErrorStartTime = -1;
1169        }
1170        index++;
1171      } catch (BucketAllocatorException fle) {
1172        long currTs = EnvironmentEdgeManager.currentTime();
1173        cacheStats.allocationFailed(); // Record the warning.
1174        if (
1175          allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD
1176        ) {
1177          LOG.warn(getAllocationFailWarningMessage(fle, re));
1178          allocFailLogPrevTs = currTs;
1179        }
1180        // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
1181        bucketEntries[index] = null;
1182        index++;
1183      } catch (CacheFullException cfe) {
1184        // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
1185        if (!freeInProgress) {
1186          freeSpace("Full!");
1187        } else {
1188          Thread.sleep(50);
1189        }
1190      } catch (IOException ioex) {
1191        // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
1192        LOG.error("Failed writing to bucket cache", ioex);
1193        checkIOErrorIsTolerated();
1194      }
1195    }
1196
1197    // Make sure data pages are written on media before we update maps.
1198    try {
1199      ioEngine.sync();
1200    } catch (IOException ioex) {
1201      LOG.error("Failed syncing IO engine", ioex);
1202      checkIOErrorIsTolerated();
1203      // Since we failed sync, free the blocks in bucket allocator
1204      for (int i = 0; i < entries.size(); ++i) {
1205        BucketEntry bucketEntry = bucketEntries[i];
1206        if (bucketEntry != null) {
1207          bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
1208          bucketEntries[i] = null;
1209        }
1210      }
1211    }
1212
1213    // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
1214    // success or error.
1215    for (int i = 0; i < size; ++i) {
1216      BlockCacheKey key = entries.get(i).getKey();
1217      // Only add if non-null entry.
1218      if (bucketEntries[i] != null) {
1219        putIntoBackingMap(key, bucketEntries[i]);
1220        if (ioEngine.isPersistent()) {
1221          setCacheInconsistent(true);
1222        }
1223      }
1224      // Always remove from ramCache even if we failed adding it to the block cache above.
1225      boolean existed = ramCache.remove(key, re -> {
1226        if (re != null) {
1227          heapSize.add(-1 * re.getData().heapSize());
1228        }
1229      });
1230      if (!existed && bucketEntries[i] != null) {
1231        // Block should have already been evicted. Remove it and free space.
1232        final BucketEntry bucketEntry = bucketEntries[i];
1233        bucketEntry.withWriteLock(offsetLock, () -> {
1234          if (backingMap.remove(key, bucketEntry)) {
1235            blockEvicted(key, bucketEntry, false, false);
1236          }
1237          return null;
1238        });
1239      }
1240    }
1241
1242    long used = bucketAllocator.getUsedSize();
1243    if (used > acceptableSize()) {
1244      freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1245    }
1246    return;
1247  }
1248
1249  /**
1250   * Blocks until elements available in {@code q} then tries to grab as many as possible before
1251   * returning.
1252   * @param receptacle Where to stash the elements taken from queue. We clear before we use it just
1253   *                   in case.
1254   * @param q          The queue to take from.
1255   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1256   */
1257  static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
1258    List<RAMQueueEntry> receptacle) throws InterruptedException {
1259    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1260    // ok even if list grew to accommodate thousands.
1261    receptacle.clear();
1262    receptacle.add(q.take());
1263    q.drainTo(receptacle);
1264    return receptacle;
1265  }
1266
1267  /**
1268   * @see #retrieveFromFile(int[])
1269   */
1270  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
1271      justification = "false positive, try-with-resources ensures close is called.")
1272  void persistToFile() throws IOException {
1273    LOG.debug("Thread {} started persisting bucket cache to file",
1274      Thread.currentThread().getName());
1275    if (!isCachePersistent()) {
1276      throw new IOException("Attempt to persist non-persistent cache mappings!");
1277    }
1278    File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime());
1279    try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) {
1280      fos.write(ProtobufMagic.PB_MAGIC);
1281      BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
1282    } catch (IOException e) {
1283      LOG.error("Failed to persist bucket cache to file", e);
1284      throw e;
1285    }
1286    LOG.debug("Thread {} finished persisting bucket cache to file, renaming",
1287      Thread.currentThread().getName());
1288    if (!tempPersistencePath.renameTo(new File(persistencePath))) {
1289      LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if "
1290        + "RS crashes/restarts before we successfully checkpoint again.");
1291    }
1292  }
1293
1294  public boolean isCachePersistent() {
1295    return ioEngine.isPersistent() && persistencePath != null;
1296  }
1297
1298  public Map<String, Long> getRegionCachedInfo() {
1299    return Collections.unmodifiableMap(regionCachedSizeMap);
1300  }
1301
1302  /**
1303   * @see #persistToFile()
1304   */
1305  private void retrieveFromFile(int[] bucketSizes) throws IOException {
1306    LOG.info("Started retrieving bucket cache from file");
1307    File persistenceFile = new File(persistencePath);
1308    if (!persistenceFile.exists()) {
1309      LOG.warn("Persistence file missing! "
1310        + "It's ok if it's first run after enabling persistent cache.");
1311      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
1312      blockNumber.add(backingMap.size());
1313      backingMapValidated.set(true);
1314      return;
1315    }
1316    assert !cacheEnabled;
1317
1318    try (FileInputStream in = new FileInputStream(persistenceFile)) {
1319      int pblen = ProtobufMagic.lengthOfPBMagic();
1320      byte[] pbuf = new byte[pblen];
1321      IOUtils.readFully(in, pbuf, 0, pblen);
1322      if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
1323        // In 3.0 we have enough flexibility to dump the old cache data.
1324        // TODO: In 2.x line, this might need to be filled in to support reading the old format
1325        throw new IOException(
1326          "Persistence file does not start with protobuf magic number. " + persistencePath);
1327      }
1328      parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
1329      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
1330      blockNumber.add(backingMap.size());
1331      LOG.info("Bucket cache retrieved from file successfully");
1332    }
1333  }
1334
1335  private void updateRegionSizeMapWhileRetrievingFromFile() {
1336    // Update the regionCachedSizeMap with the region size while restarting the region server
1337    if (LOG.isDebugEnabled()) {
1338      LOG.debug("Updating region size map after retrieving cached file list");
1339      dumpPrefetchList();
1340    }
1341    regionCachedSizeMap.clear();
1342    fullyCachedFiles.forEach((hFileName, hFileSize) -> {
1343      // Get the region name for each file
1344      String regionEncodedName = hFileSize.getFirst();
1345      long cachedFileSize = hFileSize.getSecond();
1346      regionCachedSizeMap.merge(regionEncodedName, cachedFileSize,
1347        (oldpf, fileSize) -> oldpf + fileSize);
1348    });
1349  }
1350
1351  private void dumpPrefetchList() {
1352    for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) {
1353      LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(),
1354        outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond());
1355    }
1356  }
1357
1358  /**
1359   * Create an input stream that deletes the file after reading it. Use in try-with-resources to
1360   * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
1361   *
1362   * <pre>
1363   *   File f = ...
1364   *   try (FileInputStream fis = new FileInputStream(f)) {
1365   *     // use the input stream
1366   *   } finally {
1367   *     if (!f.delete()) throw new IOException("failed to delete");
1368   *   }
1369   * </pre>
1370   *
1371   * @param file the file to read and delete
1372   * @return a FileInputStream for the given file
1373   * @throws IOException if there is a problem creating the stream
1374   */
1375  private FileInputStream deleteFileOnClose(final File file) throws IOException {
1376    return new FileInputStream(file) {
1377      private File myFile;
1378
1379      private FileInputStream init(File file) {
1380        myFile = file;
1381        return this;
1382      }
1383
1384      @Override
1385      public void close() throws IOException {
1386        // close() will be called during try-with-resources and it will be
1387        // called by finalizer thread during GC. To avoid double-free resource,
1388        // set myFile to null after the first call.
1389        if (myFile == null) {
1390          return;
1391        }
1392
1393        super.close();
1394        if (!myFile.delete()) {
1395          throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
1396        }
1397        myFile = null;
1398      }
1399    }.init(file);
1400  }
1401
1402  private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
1403    throws IOException {
1404    if (capacitySize != cacheCapacity) {
1405      throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize)
1406        + ", expected: " + StringUtils.byteDesc(cacheCapacity));
1407    }
1408    if (!ioEngine.getClass().getName().equals(ioclass)) {
1409      throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:"
1410        + ioEngine.getClass().getName());
1411    }
1412    if (!backingMap.getClass().getName().equals(mapclass)) {
1413      throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:"
1414        + backingMap.getClass().getName());
1415    }
1416  }
1417
1418  private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
1419    Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
1420      BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
1421        this::createRecycler);
1422    backingMap = pair.getFirst();
1423    blocksByHFile = pair.getSecond();
1424    fullyCachedFiles.clear();
1425    fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
1426    if (proto.hasChecksum()) {
1427      try {
1428        ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
1429          algorithm);
1430        backingMapValidated.set(true);
1431      } catch (IOException e) {
1432        LOG.warn("Checksum for cache file failed. "
1433          + "We need to validate each cache key in the backing map. "
1434          + "This may take some time, so we'll do it in a background thread,");
1435        Runnable cacheValidator = () -> {
1436          while (bucketAllocator == null) {
1437            try {
1438              Thread.sleep(50);
1439            } catch (InterruptedException ex) {
1440              throw new RuntimeException(ex);
1441            }
1442          }
1443          long startTime = EnvironmentEdgeManager.currentTime();
1444          int totalKeysOriginally = backingMap.size();
1445          for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) {
1446            try {
1447              ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
1448            } catch (IOException e1) {
1449              LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
1450              evictBlock(keyEntry.getKey());
1451              removeFileFromPrefetch(keyEntry.getKey().getHfileName());
1452            }
1453          }
1454          backingMapValidated.set(true);
1455          LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.",
1456            totalKeysOriginally, backingMap.size(),
1457            (EnvironmentEdgeManager.currentTime() - startTime));
1458        };
1459        Thread t = new Thread(cacheValidator);
1460        t.setDaemon(true);
1461        t.start();
1462      }
1463    } else {
1464      // if has not checksum, it means the persistence file is old format
1465      LOG.info("Persistent file is old format, it does not support verifying file integrity!");
1466      backingMapValidated.set(true);
1467    }
1468    updateRegionSizeMapWhileRetrievingFromFile();
1469    verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
1470  }
1471
1472  /**
1473   * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
1474   * exceeds ioErrorsDurationTimeTolerated, we will disable the cache
1475   */
1476  private void checkIOErrorIsTolerated() {
1477    long now = EnvironmentEdgeManager.currentTime();
1478    // Do a single read to a local variable to avoid timing issue - HBASE-24454
1479    long ioErrorStartTimeTmp = this.ioErrorStartTime;
1480    if (ioErrorStartTimeTmp > 0) {
1481      if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) {
1482        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration
1483          + "ms, disabling cache, please check your IOEngine");
1484        disableCache();
1485      }
1486    } else {
1487      this.ioErrorStartTime = now;
1488    }
1489  }
1490
1491  /**
1492   * Used to shut down the cache -or- turn it off in the case of something broken.
1493   */
1494  private void disableCache() {
1495    if (!cacheEnabled) return;
1496    LOG.info("Disabling cache");
1497    cacheEnabled = false;
1498    ioEngine.shutdown();
1499    this.scheduleThreadPool.shutdown();
1500    for (int i = 0; i < writerThreads.length; ++i)
1501      writerThreads[i].interrupt();
1502    this.ramCache.clear();
1503    if (!ioEngine.isPersistent() || persistencePath == null) {
1504      // If persistent ioengine and a path, we will serialize out the backingMap.
1505      this.backingMap.clear();
1506      this.blocksByHFile.clear();
1507      this.fullyCachedFiles.clear();
1508      this.regionCachedSizeMap.clear();
1509    }
1510  }
1511
1512  private void join() throws InterruptedException {
1513    for (int i = 0; i < writerThreads.length; ++i)
1514      writerThreads[i].join();
1515  }
1516
1517  @Override
1518  public void shutdown() {
1519    disableCache();
1520    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write="
1521      + persistencePath);
1522    if (ioEngine.isPersistent() && persistencePath != null) {
1523      try {
1524        join();
1525        if (cachePersister != null) {
1526          LOG.info("Shutting down cache persister thread.");
1527          cachePersister.shutdown();
1528          while (cachePersister.isAlive()) {
1529            Thread.sleep(10);
1530          }
1531        }
1532        persistToFile();
1533      } catch (IOException ex) {
1534        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1535      } catch (InterruptedException e) {
1536        LOG.warn("Failed to persist data on exit", e);
1537      }
1538    }
1539  }
1540
1541  /**
1542   * Needed mostly for UTs that might run in the same VM and create different BucketCache instances
1543   * on different UT methods.
1544   */
1545  @Override
1546  protected void finalize() {
1547    if (cachePersister != null && !cachePersister.isInterrupted()) {
1548      cachePersister.interrupt();
1549    }
1550  }
1551
1552  @Override
1553  public CacheStats getStats() {
1554    return cacheStats;
1555  }
1556
1557  public BucketAllocator getAllocator() {
1558    return this.bucketAllocator;
1559  }
1560
1561  @Override
1562  public long heapSize() {
1563    return this.heapSize.sum();
1564  }
1565
1566  @Override
1567  public long size() {
1568    return this.realCacheSize.sum();
1569  }
1570
1571  @Override
1572  public long getCurrentDataSize() {
1573    return size();
1574  }
1575
1576  @Override
1577  public long getFreeSize() {
1578    return this.bucketAllocator.getFreeSize();
1579  }
1580
1581  @Override
1582  public long getBlockCount() {
1583    return this.blockNumber.sum();
1584  }
1585
1586  @Override
1587  public long getDataBlockCount() {
1588    return getBlockCount();
1589  }
1590
1591  @Override
1592  public long getCurrentSize() {
1593    return this.bucketAllocator.getUsedSize();
1594  }
1595
1596  protected String getAlgorithm() {
1597    return algorithm;
1598  }
1599
1600  /**
1601   * Evicts all blocks for a specific HFile.
1602   * <p>
1603   * This is used for evict-on-close to remove all blocks of a specific HFile.
1604   * @return the number of blocks evicted
1605   */
1606  @Override
1607  public int evictBlocksByHfileName(String hfileName) {
1608    removeFileFromPrefetch(hfileName);
1609    Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName);
1610    int numEvicted = 0;
1611    for (BlockCacheKey key : keySet) {
1612      if (evictBlock(key)) {
1613        ++numEvicted;
1614      }
1615    }
1616    return numEvicted;
1617  }
1618
1619  private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName) {
1620    return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
1621      new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1622  }
1623
1624  /**
1625   * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
1626   * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
1627   * number of elements out of each according to configuration parameters and their relative sizes.
1628   */
1629  private class BucketEntryGroup {
1630
1631    private CachedEntryQueue queue;
1632    private long totalSize = 0;
1633    private long bucketSize;
1634
1635    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1636      this.bucketSize = bucketSize;
1637      queue = new CachedEntryQueue(bytesToFree, blockSize);
1638      totalSize = 0;
1639    }
1640
1641    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1642      totalSize += block.getValue().getLength();
1643      queue.add(block);
1644    }
1645
1646    public long free(long toFree) {
1647      Map.Entry<BlockCacheKey, BucketEntry> entry;
1648      long freedBytes = 0;
1649      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1650      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1651      while ((entry = queue.pollLast()) != null) {
1652        BlockCacheKey blockCacheKey = entry.getKey();
1653        BucketEntry be = entry.getValue();
1654        if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
1655          freedBytes += be.getLength();
1656        }
1657        if (freedBytes >= toFree) {
1658          return freedBytes;
1659        }
1660      }
1661      return freedBytes;
1662    }
1663
1664    public long overflow() {
1665      return totalSize - bucketSize;
1666    }
1667
1668    public long totalSize() {
1669      return totalSize;
1670    }
1671  }
1672
1673  /**
1674   * Block Entry stored in the memory with key,data and so on
1675   */
1676  static class RAMQueueEntry {
1677    private final BlockCacheKey key;
1678    private final Cacheable data;
1679    private long accessCounter;
1680    private boolean inMemory;
1681    private boolean isCachePersistent;
1682
1683    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
1684      boolean isCachePersistent) {
1685      this.key = bck;
1686      this.data = data;
1687      this.accessCounter = accessCounter;
1688      this.inMemory = inMemory;
1689      this.isCachePersistent = isCachePersistent;
1690    }
1691
1692    public Cacheable getData() {
1693      return data;
1694    }
1695
1696    public BlockCacheKey getKey() {
1697      return key;
1698    }
1699
1700    public void access(long accessCounter) {
1701      this.accessCounter = accessCounter;
1702    }
1703
1704    private ByteBuffAllocator getByteBuffAllocator() {
1705      if (data instanceof HFileBlock) {
1706        return ((HFileBlock) data).getByteBuffAllocator();
1707      }
1708      return ByteBuffAllocator.HEAP;
1709    }
1710
1711    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
1712      final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
1713      ByteBuffer metaBuff) throws IOException {
1714      int len = data.getSerializedLength();
1715      // This cacheable thing can't be serialized
1716      if (len == 0) {
1717        return null;
1718      }
1719      if (isCachePersistent && data instanceof HFileBlock) {
1720        len += Long.BYTES; // we need to record the cache time for consistency check in case of
1721                           // recovery
1722      }
1723      long offset = alloc.allocateBlock(len);
1724      boolean succ = false;
1725      BucketEntry bucketEntry = null;
1726      try {
1727        int diskSizeWithHeader = (data instanceof HFileBlock)
1728          ? ((HFileBlock) data).getOnDiskSizeWithHeader()
1729          : data.getSerializedLength();
1730        bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory,
1731          createRecycler, getByteBuffAllocator());
1732        bucketEntry.setDeserializerReference(data.getDeserializer());
1733        if (data instanceof HFileBlock) {
1734          // If an instance of HFileBlock, save on some allocations.
1735          HFileBlock block = (HFileBlock) data;
1736          ByteBuff sliceBuf = block.getBufferReadOnly();
1737          block.getMetaData(metaBuff);
1738          // adds the cache time prior to the block and metadata part
1739          if (isCachePersistent) {
1740            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
1741            buffer.putLong(bucketEntry.getCachedTime());
1742            buffer.rewind();
1743            ioEngine.write(buffer, offset);
1744            ioEngine.write(sliceBuf, (offset + Long.BYTES));
1745          } else {
1746            ioEngine.write(sliceBuf, offset);
1747          }
1748          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
1749        } else {
1750          // Only used for testing.
1751          ByteBuffer bb = ByteBuffer.allocate(len);
1752          data.serialize(bb, true);
1753          ioEngine.write(bb, offset);
1754        }
1755        succ = true;
1756      } finally {
1757        if (!succ) {
1758          alloc.freeBlock(offset, len);
1759        }
1760      }
1761      realCacheSize.add(len);
1762      return bucketEntry;
1763    }
1764  }
1765
1766  /**
1767   * Only used in test
1768   */
1769  void stopWriterThreads() throws InterruptedException {
1770    for (WriterThread writerThread : writerThreads) {
1771      writerThread.disableWriter();
1772      writerThread.interrupt();
1773      writerThread.join();
1774    }
1775  }
1776
1777  @Override
1778  public Iterator<CachedBlock> iterator() {
1779    // Don't bother with ramcache since stuff is in here only a little while.
1780    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator();
1781    return new Iterator<CachedBlock>() {
1782      private final long now = System.nanoTime();
1783
1784      @Override
1785      public boolean hasNext() {
1786        return i.hasNext();
1787      }
1788
1789      @Override
1790      public CachedBlock next() {
1791        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1792        return new CachedBlock() {
1793          @Override
1794          public String toString() {
1795            return BlockCacheUtil.toString(this, now);
1796          }
1797
1798          @Override
1799          public BlockPriority getBlockPriority() {
1800            return e.getValue().getPriority();
1801          }
1802
1803          @Override
1804          public BlockType getBlockType() {
1805            // Not held by BucketEntry. Could add it if wanted on BucketEntry creation.
1806            return null;
1807          }
1808
1809          @Override
1810          public long getOffset() {
1811            return e.getKey().getOffset();
1812          }
1813
1814          @Override
1815          public long getSize() {
1816            return e.getValue().getLength();
1817          }
1818
1819          @Override
1820          public long getCachedTime() {
1821            return e.getValue().getCachedTime();
1822          }
1823
1824          @Override
1825          public String getFilename() {
1826            return e.getKey().getHfileName();
1827          }
1828
1829          @Override
1830          public int compareTo(CachedBlock other) {
1831            int diff = this.getFilename().compareTo(other.getFilename());
1832            if (diff != 0) return diff;
1833
1834            diff = Long.compare(this.getOffset(), other.getOffset());
1835            if (diff != 0) return diff;
1836            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1837              throw new IllegalStateException(
1838                "" + this.getCachedTime() + ", " + other.getCachedTime());
1839            }
1840            return Long.compare(other.getCachedTime(), this.getCachedTime());
1841          }
1842
1843          @Override
1844          public int hashCode() {
1845            return e.getKey().hashCode();
1846          }
1847
1848          @Override
1849          public boolean equals(Object obj) {
1850            if (obj instanceof CachedBlock) {
1851              CachedBlock cb = (CachedBlock) obj;
1852              return compareTo(cb) == 0;
1853            } else {
1854              return false;
1855            }
1856          }
1857        };
1858      }
1859
1860      @Override
1861      public void remove() {
1862        throw new UnsupportedOperationException();
1863      }
1864    };
1865  }
1866
1867  @Override
1868  public BlockCache[] getBlockCaches() {
1869    return null;
1870  }
1871
1872  public int getRpcRefCount(BlockCacheKey cacheKey) {
1873    BucketEntry bucketEntry = backingMap.get(cacheKey);
1874    if (bucketEntry != null) {
1875      return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1);
1876    }
1877    return 0;
1878  }
1879
1880  float getAcceptableFactor() {
1881    return acceptableFactor;
1882  }
1883
1884  float getMinFactor() {
1885    return minFactor;
1886  }
1887
1888  float getExtraFreeFactor() {
1889    return extraFreeFactor;
1890  }
1891
1892  float getSingleFactor() {
1893    return singleFactor;
1894  }
1895
1896  float getMultiFactor() {
1897    return multiFactor;
1898  }
1899
1900  float getMemoryFactor() {
1901    return memoryFactor;
1902  }
1903
1904  public String getPersistencePath() {
1905    return persistencePath;
1906  }
1907
1908  /**
1909   * Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
1910   */
1911  static class RAMCache {
1912    /**
1913     * Defined the map as {@link ConcurrentHashMap} explicitly here, because in
1914     * {@link RAMCache#get(BlockCacheKey)} and
1915     * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee
1916     * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the
1917     * func method can execute exactly once only when the key is present(or absent) and under the
1918     * lock context. Otherwise, the reference count of block will be messed up. Notice that the
1919     * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
1920     */
1921    final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
1922
1923    public boolean containsKey(BlockCacheKey key) {
1924      return delegate.containsKey(key);
1925    }
1926
1927    public RAMQueueEntry get(BlockCacheKey key) {
1928      return delegate.computeIfPresent(key, (k, re) -> {
1929        // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
1930        // atomic, another thread may remove and release the block, when retaining in this thread we
1931        // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
1932        re.getData().retain();
1933        return re;
1934      });
1935    }
1936
1937    /**
1938     * Return the previous associated value, or null if absent. It has the same meaning as
1939     * {@link ConcurrentMap#putIfAbsent(Object, Object)}
1940     */
1941    public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
1942      AtomicBoolean absent = new AtomicBoolean(false);
1943      RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
1944        // The RAMCache reference to this entry, so reference count should be increment.
1945        entry.getData().retain();
1946        absent.set(true);
1947        return entry;
1948      });
1949      return absent.get() ? null : re;
1950    }
1951
1952    public boolean remove(BlockCacheKey key) {
1953      return remove(key, re -> {
1954      });
1955    }
1956
1957    /**
1958     * Defined an {@link Consumer} here, because once the removed entry release its reference count,
1959     * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
1960     * exception. the consumer will access entry to remove before release its reference count.
1961     * Notice, don't change its reference count in the {@link Consumer}
1962     */
1963    public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
1964      RAMQueueEntry previous = delegate.remove(key);
1965      action.accept(previous);
1966      if (previous != null) {
1967        previous.getData().release();
1968      }
1969      return previous != null;
1970    }
1971
1972    public boolean isEmpty() {
1973      return delegate.isEmpty();
1974    }
1975
1976    public void clear() {
1977      Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
1978      while (it.hasNext()) {
1979        RAMQueueEntry re = it.next().getValue();
1980        it.remove();
1981        re.getData().release();
1982      }
1983    }
1984
1985    public boolean hasBlocksForFile(String fileName) {
1986      return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName))
1987        .findFirst().isPresent();
1988    }
1989  }
1990
1991  public Map<BlockCacheKey, BucketEntry> getBackingMap() {
1992    return backingMap;
1993  }
1994
1995  public AtomicBoolean getBackingMapValidated() {
1996    return backingMapValidated;
1997  }
1998
1999  @Override
2000  public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() {
2001    return Optional.of(fullyCachedFiles);
2002  }
2003
2004  public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) {
2005    if (cacheConf.getBlockCache().isPresent()) {
2006      BlockCache bc = cacheConf.getBlockCache().get();
2007      if (bc instanceof CombinedBlockCache) {
2008        BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache();
2009        if (l2 instanceof BucketCache) {
2010          return Optional.of((BucketCache) l2);
2011        }
2012      } else if (bc instanceof BucketCache) {
2013        return Optional.of((BucketCache) bc);
2014      }
2015    }
2016    return Optional.empty();
2017  }
2018
2019  @Override
2020  public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount,
2021    long size) {
2022    // block eviction may be happening in the background as prefetch runs,
2023    // so we need to count all blocks for this file in the backing map under
2024    // a read lock for the block offset
2025    final List<ReentrantReadWriteLock> locks = new ArrayList<>();
2026    LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}",
2027      fileName, totalBlockCount, dataBlockCount);
2028    try {
2029      final MutableInt count = new MutableInt();
2030      LOG.debug("iterating over {} entries in the backing map", backingMap.size());
2031      backingMap.entrySet().stream().forEach(entry -> {
2032        if (
2033          entry.getKey().getHfileName().equals(fileName.getName())
2034            && entry.getKey().getBlockType().equals(BlockType.DATA)
2035        ) {
2036          long offsetToLock = entry.getValue().offset();
2037          LOG.debug("found block {} in the backing map. Acquiring read lock for offset {}",
2038            entry.getKey(), offsetToLock);
2039          ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock);
2040          lock.readLock().lock();
2041          locks.add(lock);
2042          // rechecks the given key is still there (no eviction happened before the lock acquired)
2043          if (backingMap.containsKey(entry.getKey())) {
2044            count.increment();
2045          } else {
2046            lock.readLock().unlock();
2047            locks.remove(lock);
2048            LOG.debug("found block {}, but when locked and tried to count, it was gone.");
2049          }
2050        }
2051      });
2052      int metaCount = totalBlockCount - dataBlockCount;
2053      // BucketCache would only have data blocks
2054      if (dataBlockCount == count.getValue()) {
2055        LOG.debug("File {} has now been fully cached.", fileName);
2056        fileCacheCompleted(fileName, size);
2057      } else {
2058        LOG.debug(
2059          "Prefetch executor completed for {}, but only {} data blocks were cached. "
2060            + "Total data blocks for file: {}. "
2061            + "Checking for blocks pending cache in cache writer queue.",
2062          fileName, count.getValue(), dataBlockCount);
2063        if (ramCache.hasBlocksForFile(fileName.getName())) {
2064          for (ReentrantReadWriteLock lock : locks) {
2065            lock.readLock().unlock();
2066          }
2067          LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms "
2068            + "and try the verification again.", fileName.getName());
2069          Thread.sleep(100);
2070          notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
2071        } else
2072          if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) == dataBlockCount) {
2073            LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in "
2074              + "the cache write queue but we now found that total cached blocks for file {} "
2075              + "is equal to data block count.", count, dataBlockCount, fileName.getName());
2076            fileCacheCompleted(fileName, size);
2077          } else {
2078            LOG.info("We found only {} data blocks cached from a total of {} for file {}, "
2079              + "but no blocks pending caching. Maybe cache is full or evictions "
2080              + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName);
2081          }
2082      }
2083    } catch (InterruptedException e) {
2084      throw new RuntimeException(e);
2085    } finally {
2086      for (ReentrantReadWriteLock lock : locks) {
2087        lock.readLock().unlock();
2088      }
2089    }
2090  }
2091
2092  @Override
2093  public void notifyFileBlockEvicted(String fileName) {
2094    removeFileFromPrefetch(fileName);
2095  }
2096
2097  @Override
2098  public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
2099    long currentUsed = bucketAllocator.getUsedSize();
2100    boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize();
2101    return Optional.of(result);
2102  }
2103
2104  @Override
2105  public Optional<Boolean> shouldCacheFile(String fileName) {
2106    // if we don't have the file in fullyCachedFiles, we should cache it
2107    return Optional.of(!fullyCachedFiles.containsKey(fileName));
2108  }
2109
2110  @Override
2111  public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
2112    return Optional.of(getBackingMap().containsKey(key));
2113  }
2114
2115  @Override
2116  public Optional<Integer> getBlockSize(BlockCacheKey key) {
2117    BucketEntry entry = backingMap.get(key);
2118    if (entry == null) {
2119      return Optional.empty();
2120    } else {
2121      return Optional.of(entry.getOnDiskSizeWithHeader());
2122    }
2123
2124  }
2125
2126  private void removeFileFromPrefetch(String hfileName) {
2127    // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted
2128    if (fullyCachedFiles.containsKey(hfileName)) {
2129      Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName);
2130      String regionEncodedName = regionEntry.getFirst();
2131      long filePrefetchSize = regionEntry.getSecond();
2132      LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName);
2133      regionCachedSizeMap.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize);
2134      // If all the blocks for a region are evicted from the cache, remove the entry for that region
2135      if (
2136        regionCachedSizeMap.containsKey(regionEncodedName)
2137          && regionCachedSizeMap.get(regionEncodedName) == 0
2138      ) {
2139        regionCachedSizeMap.remove(regionEncodedName);
2140      }
2141    }
2142    fullyCachedFiles.remove(hfileName);
2143  }
2144
2145  public void fileCacheCompleted(Path filePath, long size) {
2146    Pair<String, Long> pair = new Pair<>();
2147    // sets the region name
2148    String regionName = filePath.getParent().getParent().getName();
2149    pair.setFirst(regionName);
2150    pair.setSecond(size);
2151    fullyCachedFiles.put(filePath.getName(), pair);
2152    regionCachedSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize);
2153  }
2154
2155}