001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static java.util.Objects.requireNonNull; 021 022import java.lang.ref.WeakReference; 023import java.util.EnumMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.PriorityQueue; 028import java.util.SortedSet; 029import java.util.TreeSet; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.Executors; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.concurrent.atomic.LongAdder; 036import java.util.concurrent.locks.ReentrantLock; 037import org.apache.commons.lang3.mutable.MutableBoolean; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.io.HeapSize; 040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 041import org.apache.hadoop.hbase.util.ClassSize; 042import org.apache.hadoop.util.StringUtils; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 048import org.apache.hbase.thirdparty.com.google.common.base.Objects; 049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 050 051/** 052 * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an 053 * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a 054 * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} 055 * operations. 056 * <p> 057 * Contains three levels of block priority to allow for scan-resistance and in-memory families 058 * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 059 * family is a column family that should be served from memory if possible): single-access, 060 * multiple-accesses, and in-memory priority. A block is added with an in-memory priority flag if 061 * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a 062 * single access priority the first time it is read into this block cache. If a block is accessed 063 * again while in cache, it is marked as a multiple access priority block. This delineation of 064 * blocks is used to prevent scans from thrashing the cache adding a least-frequently-used element 065 * to the eviction algorithm. 066 * <p> 067 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each 068 * priority will retain close to its maximum size, however, if any priority is not using its entire 069 * chunk the others are able to grow beyond their chunk size. 070 * <p> 071 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The 072 * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It 073 * is only used for pre-allocating data structures and in initial heap estimation of the map. 074 * <p> 075 * The detailed constructor defines the sizes for the three priorities (they should total to the 076 * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction 077 * thread. 078 * <p> 079 * The <code>acceptable size</code> is the cache size level which triggers the eviction process to 080 * start. It evicts enough blocks to get the size below the minimum size specified. 081 * <p> 082 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines 083 * how many bytes must be freed to reach the minimum size, and then while scanning determines the 084 * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times 085 * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative 086 * sizes and usage. 087 */ 088@InterfaceAudience.Private 089public class LruBlockCache implements FirstLevelBlockCache { 090 091 private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class); 092 093 /** 094 * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep 095 * evicting during an eviction run till the cache size is down to 80% of the total. 096 */ 097 private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; 098 099 /** 100 * Acceptable size of cache (no evictions if size < acceptable) 101 */ 102 private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = 103 "hbase.lru.blockcache.acceptable.factor"; 104 105 /** 106 * Hard capacity limit of cache, will reject any put if size > this * acceptable 107 */ 108 static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = 109 "hbase.lru.blockcache.hard.capacity.limit.factor"; 110 private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = 111 "hbase.lru.blockcache.single.percentage"; 112 private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = 113 "hbase.lru.blockcache.multi.percentage"; 114 private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = 115 "hbase.lru.blockcache.memory.percentage"; 116 117 /** 118 * Configuration key to force data-block always (except in-memory are too much) cached in memory 119 * for in-memory hfile, unlike inMemory, which is a column-family configuration, inMemoryForceMode 120 * is a cluster-wide configuration 121 */ 122 private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = 123 "hbase.lru.rs.inmemoryforcemode"; 124 125 /* Default Configuration Parameters */ 126 127 /* Backing Concurrent Map Configuration */ 128 static final float DEFAULT_LOAD_FACTOR = 0.75f; 129 static final int DEFAULT_CONCURRENCY_LEVEL = 16; 130 131 /* Eviction thresholds */ 132 private static final float DEFAULT_MIN_FACTOR = 0.95f; 133 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; 134 135 /* Priority buckets */ 136 private static final float DEFAULT_SINGLE_FACTOR = 0.25f; 137 private static final float DEFAULT_MULTI_FACTOR = 0.50f; 138 private static final float DEFAULT_MEMORY_FACTOR = 0.25f; 139 140 private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; 141 142 private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; 143 144 /* Statistics thread */ 145 private static final int STAT_THREAD_PERIOD = 60 * 5; 146 private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; 147 private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; 148 149 /** 150 * Defined the cache map as {@link ConcurrentHashMap} here, because in 151 * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#k (key, func). 152 * Besides, the func method must execute exactly once only when the key is present and under the 153 * lock context, otherwise the reference count will be messed up. Notice that the 154 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. Some code using 155 * #computeIfPresent also expects the supplier to be executed only once. ConcurrentHashMap can 156 * guarantee that. Other types may not. 157 */ 158 private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map; 159 160 /** Eviction lock (locked when eviction in process) */ 161 private transient final ReentrantLock evictionLock = new ReentrantLock(true); 162 163 private final long maxBlockSize; 164 165 /** Volatile boolean to track if we are in an eviction process or not */ 166 private volatile boolean evictionInProgress = false; 167 168 /** Eviction thread */ 169 private transient final EvictionThread evictionThread; 170 171 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 172 private transient final ScheduledExecutorService scheduleThreadPool = 173 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 174 .setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build()); 175 176 /** Current size of cache */ 177 private final AtomicLong size; 178 179 /** Current size of data blocks */ 180 private final LongAdder dataBlockSize = new LongAdder(); 181 182 /** Current size of index blocks */ 183 private final LongAdder indexBlockSize = new LongAdder(); 184 185 /** Current size of bloom blocks */ 186 private final LongAdder bloomBlockSize = new LongAdder(); 187 188 /** Current number of cached elements */ 189 private final AtomicLong elements; 190 191 /** Current number of cached data block elements */ 192 private final LongAdder dataBlockElements = new LongAdder(); 193 194 /** Current number of cached index block elements */ 195 private final LongAdder indexBlockElements = new LongAdder(); 196 197 /** Current number of cached bloom block elements */ 198 private final LongAdder bloomBlockElements = new LongAdder(); 199 200 /** Cache access count (sequential ID) */ 201 private final AtomicLong count; 202 203 /** hard capacity limit */ 204 private float hardCapacityLimitFactor; 205 206 /** Cache statistics */ 207 private final CacheStats stats; 208 209 /** Maximum allowable size of cache (block put if size > max, evict) */ 210 private long maxSize; 211 212 /** Approximate block size */ 213 private long blockSize; 214 215 /** Acceptable size of cache (no evictions if size < acceptable) */ 216 private float acceptableFactor; 217 218 /** Minimum threshold of cache (when evicting, evict until size < min) */ 219 private float minFactor; 220 221 /** Single access bucket size */ 222 private float singleFactor; 223 224 /** Multiple access bucket size */ 225 private float multiFactor; 226 227 /** In-memory bucket size */ 228 private float memoryFactor; 229 230 /** Overhead of the structure itself */ 231 private long overhead; 232 233 /** Whether in-memory hfile's data block has higher priority when evicting */ 234 private boolean forceInMemory; 235 236 /** 237 * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an 238 * external cache as L2. Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache 239 */ 240 private transient BlockCache victimHandler = null; 241 242 /** 243 * Default constructor. Specify maximum size and expected average block size (approximation is 244 * fine). 245 * <p> 246 * All other factors will be calculated based on defaults specified in this class. 247 * @param maxSize maximum size of cache, in bytes 248 * @param blockSize approximate size of each block, in bytes 249 */ 250 public LruBlockCache(long maxSize, long blockSize) { 251 this(maxSize, blockSize, true); 252 } 253 254 /** 255 * Constructor used for testing. Allows disabling of the eviction thread. 256 */ 257 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { 258 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), 259 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, 260 DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR, 261 DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, false, DEFAULT_MAX_BLOCK_SIZE); 262 } 263 264 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { 265 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), 266 DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, 267 conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), 268 conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), 269 conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), 270 conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), 271 conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), 272 conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), 273 conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), 274 conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)); 275 } 276 277 public LruBlockCache(long maxSize, long blockSize, Configuration conf) { 278 this(maxSize, blockSize, true, conf); 279 } 280 281 /** 282 * Configurable constructor. Use this constructor if not using defaults. 283 * @param maxSize maximum size of this cache, in bytes 284 * @param blockSize expected average size of blocks, in bytes 285 * @param evictionThread whether to run evictions in a bg thread or not 286 * @param mapInitialSize initial size of backing ConcurrentHashMap 287 * @param mapLoadFactor initial load factor of backing ConcurrentHashMap 288 * @param mapConcurrencyLevel initial concurrency factor for backing CHM 289 * @param minFactor percentage of total size that eviction will evict until 290 * @param acceptableFactor percentage of total size that triggers eviction 291 * @param singleFactor percentage of total size for single-access blocks 292 * @param multiFactor percentage of total size for multiple-access blocks 293 * @param memoryFactor percentage of total size for in-memory blocks 294 */ 295 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, 296 float mapLoadFactor, int mapConcurrencyLevel, float minFactor, float acceptableFactor, 297 float singleFactor, float multiFactor, float memoryFactor, float hardLimitFactor, 298 boolean forceInMemory, long maxBlockSize) { 299 this.maxBlockSize = maxBlockSize; 300 if ( 301 singleFactor + multiFactor + memoryFactor != 1 || singleFactor < 0 || multiFactor < 0 302 || memoryFactor < 0 303 ) { 304 throw new IllegalArgumentException( 305 "Single, multi, and memory factors " + " should be non-negative and total 1.0"); 306 } 307 if (minFactor >= acceptableFactor) { 308 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); 309 } 310 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { 311 throw new IllegalArgumentException("all factors must be < 1"); 312 } 313 this.maxSize = maxSize; 314 this.blockSize = blockSize; 315 this.forceInMemory = forceInMemory; 316 map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); 317 this.minFactor = minFactor; 318 this.acceptableFactor = acceptableFactor; 319 this.singleFactor = singleFactor; 320 this.multiFactor = multiFactor; 321 this.memoryFactor = memoryFactor; 322 this.stats = new CacheStats(this.getClass().getSimpleName()); 323 this.count = new AtomicLong(0); 324 this.elements = new AtomicLong(0); 325 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); 326 this.size = new AtomicLong(this.overhead); 327 this.hardCapacityLimitFactor = hardLimitFactor; 328 if (evictionThread) { 329 this.evictionThread = new EvictionThread(this); 330 this.evictionThread.start(); // FindBugs SC_START_IN_CTOR 331 } else { 332 this.evictionThread = null; 333 } 334 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 335 // every five minutes. 336 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 337 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 338 } 339 340 @Override 341 public void setVictimCache(BlockCache victimCache) { 342 if (victimHandler != null) { 343 throw new IllegalArgumentException("The victim cache has already been set"); 344 } 345 victimHandler = requireNonNull(victimCache); 346 } 347 348 @Override 349 public void setMaxSize(long maxSize) { 350 this.maxSize = maxSize; 351 if (this.size.get() > acceptableSize() && !evictionInProgress) { 352 runEviction(); 353 } 354 } 355 356 /** 357 * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap 358 * access will be more faster then off-heap, the small index block or meta block cached in 359 * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always 360 * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the 361 * heap size will be messed up. Here we will clone the block into an heap block if it's an 362 * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of 363 * the block (HBASE-22127): <br> 364 * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br> 365 * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's 366 * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by 367 * JVM, so need a retain here. 368 * @param buf the original block 369 * @return an block with an heap memory backend. 370 */ 371 private Cacheable asReferencedHeapBlock(Cacheable buf) { 372 if (buf instanceof HFileBlock) { 373 HFileBlock blk = ((HFileBlock) buf); 374 if (blk.isSharedMem()) { 375 return HFileBlock.deepCloneOnHeap(blk); 376 } 377 } 378 // The block will be referenced by this LRUBlockCache, so should increase its refCnt here. 379 return buf.retain(); 380 } 381 382 // BlockCache implementation 383 384 /** 385 * Cache the block with the specified name and buffer. 386 * <p> 387 * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) 388 * this can happen, for which we compare the buffer contents. 389 * @param cacheKey block's cache key 390 * @param buf block buffer 391 * @param inMemory if block is in-memory 392 */ 393 @Override 394 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 395 if (buf.heapSize() > maxBlockSize) { 396 // If there are a lot of blocks that are too 397 // big this can make the logs way too noisy. 398 // So we log 2% 399 if (stats.failInsert() % 50 == 0) { 400 LOG.warn("Trying to cache too large a block " + cacheKey.getHfileName() + " @ " 401 + cacheKey.getOffset() + " is " + buf.heapSize() + " which is larger than " 402 + maxBlockSize); 403 } 404 return; 405 } 406 407 LruCachedBlock cb = map.get(cacheKey); 408 if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { 409 return; 410 } 411 long currentSize = size.get(); 412 long currentAcceptableSize = acceptableSize(); 413 long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); 414 if (currentSize >= hardLimitSize) { 415 stats.failInsert(); 416 if (LOG.isTraceEnabled()) { 417 LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize) 418 + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." 419 + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) 420 + ", failed to put cacheKey:" + cacheKey + " into LruBlockCache."); 421 } 422 if (!evictionInProgress) { 423 runEviction(); 424 } 425 return; 426 } 427 // Ensure that the block is an heap one. 428 buf = asReferencedHeapBlock(buf); 429 cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); 430 long newSize = updateSizeMetrics(cb, false); 431 map.put(cacheKey, cb); 432 long val = elements.incrementAndGet(); 433 if (buf.getBlockType().isBloom()) { 434 bloomBlockElements.increment(); 435 } else if (buf.getBlockType().isIndex()) { 436 indexBlockElements.increment(); 437 } else if (buf.getBlockType().isData()) { 438 dataBlockElements.increment(); 439 } 440 if (LOG.isTraceEnabled()) { 441 long size = map.size(); 442 assertCounterSanity(size, val); 443 } 444 if (newSize > currentAcceptableSize && !evictionInProgress) { 445 runEviction(); 446 } 447 } 448 449 /** 450 * Sanity-checking for parity between actual block cache content and metrics. Intended only for 451 * use with TRACE level logging and -ea JVM. 452 */ 453 private static void assertCounterSanity(long mapSize, long counterVal) { 454 if (counterVal < 0) { 455 LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal 456 + ", mapSize=" + mapSize); 457 return; 458 } 459 if (mapSize < Integer.MAX_VALUE) { 460 double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); 461 if (pct_diff > 0.05) { 462 LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal 463 + ", mapSize=" + mapSize); 464 } 465 } 466 } 467 468 /** 469 * Cache the block with the specified name and buffer. 470 * <p> 471 * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache 472 * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an 473 * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap, 474 * otherwise the caching size is based on off-heap. 475 * @param cacheKey block's cache key 476 * @param buf block buffer 477 */ 478 @Override 479 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 480 cacheBlock(cacheKey, buf, false); 481 } 482 483 /** 484 * Helper function that updates the local size counter and also updates any per-cf or 485 * per-blocktype metrics it can discern from given {@link LruCachedBlock} 486 */ 487 private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { 488 long heapsize = cb.heapSize(); 489 BlockType bt = cb.getBuffer().getBlockType(); 490 if (evict) { 491 heapsize *= -1; 492 } 493 if (bt != null) { 494 if (bt.isBloom()) { 495 bloomBlockSize.add(heapsize); 496 } else if (bt.isIndex()) { 497 indexBlockSize.add(heapsize); 498 } else if (bt.isData()) { 499 dataBlockSize.add(heapsize); 500 } 501 } 502 return size.addAndGet(heapsize); 503 } 504 505 /** 506 * Get the buffer of the block with the specified name. 507 * @param cacheKey block's cache key 508 * @param caching true if the caller caches blocks on cache misses 509 * @param repeat Whether this is a repeat lookup for the same block (used to avoid 510 * double counting cache misses when doing double-check locking) 511 * @param updateCacheMetrics Whether to update cache metrics or not 512 * @return buffer of specified cache key, or null if not in cache 513 */ 514 @Override 515 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 516 boolean updateCacheMetrics) { 517 // Note: 'map' must be a ConcurrentHashMap or the supplier may be invoked more than once. 518 LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> { 519 // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside 520 // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove 521 // the block and release, then we're retaining a block with refCnt=0 which is disallowed. 522 // see HBASE-22422. 523 val.getBuffer().retain(); 524 return val; 525 }); 526 if (cb == null) { 527 if (!repeat && updateCacheMetrics) { 528 stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 529 } 530 // If there is another block cache then try and read there. 531 // However if this is a retry ( second time in double checked locking ) 532 // And it's already a miss then the l2 will also be a miss. 533 if (victimHandler != null && !repeat) { 534 // The handler will increase result's refCnt for RPC, so need no extra retain. 535 Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); 536 // Promote this to L1. 537 if (result != null) { 538 if (caching) { 539 cacheBlock(cacheKey, result, /* inMemory = */ false); 540 } 541 } 542 return result; 543 } 544 return null; 545 } 546 if (updateCacheMetrics) { 547 stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 548 } 549 cb.access(count.incrementAndGet()); 550 return cb.getBuffer(); 551 } 552 553 /** 554 * Whether the cache contains block with specified cacheKey 555 * @return true if contains the block 556 */ 557 @Override 558 public boolean containsBlock(BlockCacheKey cacheKey) { 559 return map.containsKey(cacheKey); 560 } 561 562 @Override 563 public boolean evictBlock(BlockCacheKey cacheKey) { 564 LruCachedBlock cb = map.get(cacheKey); 565 return cb != null && evictBlock(cb, false) > 0; 566 } 567 568 /** 569 * Evicts all blocks for a specific HFile. This is an expensive operation implemented as a 570 * linear-time search through all blocks in the cache. Ideally this should be a search in a 571 * log-access-time map. 572 * <p> 573 * This is used for evict-on-close to remove all blocks of a specific HFile. 574 * @return the number of blocks evicted 575 */ 576 @Override 577 public int evictBlocksByHfileName(String hfileName) { 578 int numEvicted = 0; 579 for (BlockCacheKey key : map.keySet()) { 580 if (key.getHfileName().equals(hfileName)) { 581 if (evictBlock(key)) { 582 ++numEvicted; 583 } 584 } 585 } 586 if (victimHandler != null) { 587 numEvicted += victimHandler.evictBlocksByHfileName(hfileName); 588 } 589 return numEvicted; 590 } 591 592 /** 593 * Evict the block, and it will be cached by the victim handler if exists && block may be 594 * read again later 595 * @param evictedByEvictionProcess true if the given block is evicted by EvictionThread 596 * @return the heap size of evicted block 597 */ 598 protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { 599 final MutableBoolean evicted = new MutableBoolean(false); 600 // Note: 'map' must be a ConcurrentHashMap or the supplier may be invoked more than once. 601 map.computeIfPresent(block.getCacheKey(), (k, v) -> { 602 // Run the victim handler before we remove the mapping in the L1 map. It must complete 603 // quickly because other removal or insertion operations can be blocked in the meantime. 604 if (evictedByEvictionProcess && victimHandler != null) { 605 victimHandler.cacheBlock(k, v.getBuffer()); 606 } 607 // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO 608 // NOT move this up because if we do that then the victimHandler may access the buffer with 609 // refCnt = 0 which is disallowed. 610 v.getBuffer().release(); 611 evicted.setTrue(); 612 // By returning null from the supplier we remove the mapping from the L1 map. 613 return null; 614 }); 615 // If we didn't find anything to evict there is nothing more to do here. 616 if (evicted.isFalse()) { 617 return 0; 618 } 619 // We evicted the block so update L1 statistics. 620 updateSizeMetrics(block, true); 621 long val = elements.decrementAndGet(); 622 if (LOG.isTraceEnabled()) { 623 long size = map.size(); 624 assertCounterSanity(size, val); 625 } 626 BlockType bt = block.getBuffer().getBlockType(); 627 if (bt.isBloom()) { 628 bloomBlockElements.decrement(); 629 } else if (bt.isIndex()) { 630 indexBlockElements.decrement(); 631 } else if (bt.isData()) { 632 dataBlockElements.decrement(); 633 } 634 if (evictedByEvictionProcess) { 635 // When the eviction of the block happened because of invalidation of HFiles, no need to 636 // update the stats counter. 637 stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); 638 } 639 return block.heapSize(); 640 } 641 642 /** 643 * Multi-threaded call to run the eviction process. 644 */ 645 private void runEviction() { 646 if (evictionThread == null || !evictionThread.isGo()) { 647 evict(); 648 } else { 649 evictionThread.evict(); 650 } 651 } 652 653 boolean isEvictionInProgress() { 654 return evictionInProgress; 655 } 656 657 long getOverhead() { 658 return overhead; 659 } 660 661 /** 662 * Eviction method. 663 */ 664 void evict() { 665 666 // Ensure only one eviction at a time 667 if (!evictionLock.tryLock()) { 668 return; 669 } 670 671 try { 672 evictionInProgress = true; 673 long currentSize = this.size.get(); 674 long bytesToFree = currentSize - minSize(); 675 676 if (LOG.isTraceEnabled()) { 677 LOG.trace("Block cache LRU eviction started; Attempting to free " 678 + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize)); 679 } 680 681 if (bytesToFree <= 0) { 682 return; 683 } 684 685 // Instantiate priority buckets 686 BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize()); 687 BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); 688 BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); 689 690 // Scan entire map putting into appropriate buckets 691 for (LruCachedBlock cachedBlock : map.values()) { 692 switch (cachedBlock.getPriority()) { 693 case SINGLE: { 694 bucketSingle.add(cachedBlock); 695 break; 696 } 697 case MULTI: { 698 bucketMulti.add(cachedBlock); 699 break; 700 } 701 case MEMORY: { 702 bucketMemory.add(cachedBlock); 703 break; 704 } 705 } 706 } 707 708 long bytesFreed = 0; 709 if (forceInMemory || memoryFactor > 0.999f) { 710 long s = bucketSingle.totalSize(); 711 long m = bucketMulti.totalSize(); 712 if (bytesToFree > (s + m)) { 713 // this means we need to evict blocks in memory bucket to make room, 714 // so the single and multi buckets will be emptied 715 bytesFreed = bucketSingle.free(s); 716 bytesFreed += bucketMulti.free(m); 717 if (LOG.isTraceEnabled()) { 718 LOG.trace( 719 "freed " + StringUtils.byteDesc(bytesFreed) + " from single and multi buckets"); 720 } 721 bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); 722 if (LOG.isTraceEnabled()) { 723 LOG.trace( 724 "freed " + StringUtils.byteDesc(bytesFreed) + " total from all three buckets "); 725 } 726 } else { 727 // this means no need to evict block in memory bucket, 728 // and we try best to make the ratio between single-bucket and 729 // multi-bucket is 1:2 730 long bytesRemain = s + m - bytesToFree; 731 if (3 * s <= bytesRemain) { 732 // single-bucket is small enough that no eviction happens for it 733 // hence all eviction goes from multi-bucket 734 bytesFreed = bucketMulti.free(bytesToFree); 735 } else if (3 * m <= 2 * bytesRemain) { 736 // multi-bucket is small enough that no eviction happens for it 737 // hence all eviction goes from single-bucket 738 bytesFreed = bucketSingle.free(bytesToFree); 739 } else { 740 // both buckets need to evict some blocks 741 bytesFreed = bucketSingle.free(s - bytesRemain / 3); 742 if (bytesFreed < bytesToFree) { 743 bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); 744 } 745 } 746 } 747 } else { 748 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); 749 750 bucketQueue.add(bucketSingle); 751 bucketQueue.add(bucketMulti); 752 bucketQueue.add(bucketMemory); 753 754 int remainingBuckets = bucketQueue.size(); 755 756 BlockBucket bucket; 757 while ((bucket = bucketQueue.poll()) != null) { 758 long overflow = bucket.overflow(); 759 if (overflow > 0) { 760 long bucketBytesToFree = 761 Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); 762 bytesFreed += bucket.free(bucketBytesToFree); 763 } 764 remainingBuckets--; 765 } 766 } 767 if (LOG.isTraceEnabled()) { 768 long single = bucketSingle.totalSize(); 769 long multi = bucketMulti.totalSize(); 770 long memory = bucketMemory.totalSize(); 771 LOG.trace( 772 "Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) 773 + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single=" 774 + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " 775 + "memory=" + StringUtils.byteDesc(memory)); 776 } 777 } finally { 778 stats.evict(); 779 evictionInProgress = false; 780 evictionLock.unlock(); 781 } 782 } 783 784 @Override 785 public String toString() { 786 return MoreObjects.toStringHelper(this).add("blockCount", getBlockCount()) 787 .add("currentSize", StringUtils.byteDesc(getCurrentSize())) 788 .add("freeSize", StringUtils.byteDesc(getFreeSize())) 789 .add("maxSize", StringUtils.byteDesc(getMaxSize())) 790 .add("heapSize", StringUtils.byteDesc(heapSize())) 791 .add("minSize", StringUtils.byteDesc(minSize())).add("minFactor", minFactor) 792 .add("multiSize", StringUtils.byteDesc(multiSize())).add("multiFactor", multiFactor) 793 .add("singleSize", StringUtils.byteDesc(singleSize())).add("singleFactor", singleFactor) 794 .toString(); 795 } 796 797 /** 798 * Used to group blocks into priority buckets. There will be a BlockBucket for each priority 799 * (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate number of 800 * elements out of each according to configuration parameters and their relatives sizes. 801 */ 802 private class BlockBucket implements Comparable<BlockBucket> { 803 804 private final String name; 805 private LruCachedBlockQueue queue; 806 private long totalSize = 0; 807 private long bucketSize; 808 809 public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { 810 this.name = name; 811 this.bucketSize = bucketSize; 812 queue = new LruCachedBlockQueue(bytesToFree, blockSize); 813 totalSize = 0; 814 } 815 816 public void add(LruCachedBlock block) { 817 totalSize += block.heapSize(); 818 queue.add(block); 819 } 820 821 public long free(long toFree) { 822 if (LOG.isTraceEnabled()) { 823 LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); 824 } 825 LruCachedBlock cb; 826 long freedBytes = 0; 827 while ((cb = queue.pollLast()) != null) { 828 freedBytes += evictBlock(cb, true); 829 if (freedBytes >= toFree) { 830 return freedBytes; 831 } 832 } 833 if (LOG.isTraceEnabled()) { 834 LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); 835 } 836 return freedBytes; 837 } 838 839 public long overflow() { 840 return totalSize - bucketSize; 841 } 842 843 public long totalSize() { 844 return totalSize; 845 } 846 847 @Override 848 public int compareTo(BlockBucket that) { 849 return Long.compare(this.overflow(), that.overflow()); 850 } 851 852 @Override 853 public boolean equals(Object that) { 854 if (that == null || !(that instanceof BlockBucket)) { 855 return false; 856 } 857 return compareTo((BlockBucket) that) == 0; 858 } 859 860 @Override 861 public int hashCode() { 862 return Objects.hashCode(name, bucketSize, queue, totalSize); 863 } 864 865 @Override 866 public String toString() { 867 return MoreObjects.toStringHelper(this).add("name", name) 868 .add("totalSize", StringUtils.byteDesc(totalSize)) 869 .add("bucketSize", StringUtils.byteDesc(bucketSize)).toString(); 870 } 871 } 872 873 /** 874 * Get the maximum size of this cache. 875 * @return max size in bytes 876 */ 877 878 @Override 879 public long getMaxSize() { 880 return this.maxSize; 881 } 882 883 @Override 884 public long getCurrentSize() { 885 return this.size.get(); 886 } 887 888 @Override 889 public long getCurrentDataSize() { 890 return this.dataBlockSize.sum(); 891 } 892 893 public long getCurrentIndexSize() { 894 return this.indexBlockSize.sum(); 895 } 896 897 public long getCurrentBloomSize() { 898 return this.bloomBlockSize.sum(); 899 } 900 901 @Override 902 public long getFreeSize() { 903 return getMaxSize() - getCurrentSize(); 904 } 905 906 @Override 907 public long size() { 908 return getMaxSize(); 909 } 910 911 @Override 912 public long getBlockCount() { 913 return this.elements.get(); 914 } 915 916 @Override 917 public long getDataBlockCount() { 918 return this.dataBlockElements.sum(); 919 } 920 921 public long getIndexBlockCount() { 922 return this.indexBlockElements.sum(); 923 } 924 925 public long getBloomBlockCount() { 926 return this.bloomBlockElements.sum(); 927 } 928 929 EvictionThread getEvictionThread() { 930 return this.evictionThread; 931 } 932 933 /* 934 * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows 935 * above the acceptable level.<p> Thread is triggered into action by {@link 936 * LruBlockCache#runEviction()} 937 */ 938 static class EvictionThread extends Thread { 939 940 private WeakReference<LruBlockCache> cache; 941 private volatile boolean go = true; 942 // flag set after enter the run method, used for test 943 private boolean enteringRun = false; 944 945 public EvictionThread(LruBlockCache cache) { 946 super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread"); 947 setDaemon(true); 948 this.cache = new WeakReference<>(cache); 949 } 950 951 @Override 952 public void run() { 953 enteringRun = true; 954 while (this.go) { 955 synchronized (this) { 956 try { 957 this.wait(1000 * 10/* Don't wait for ever */); 958 } catch (InterruptedException e) { 959 LOG.warn("Interrupted eviction thread ", e); 960 Thread.currentThread().interrupt(); 961 } 962 } 963 LruBlockCache cache = this.cache.get(); 964 if (cache == null) { 965 this.go = false; 966 break; 967 } 968 cache.evict(); 969 } 970 } 971 972 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", 973 justification = "This is what we want") 974 public void evict() { 975 synchronized (this) { 976 this.notifyAll(); 977 } 978 } 979 980 synchronized void shutdown() { 981 this.go = false; 982 this.notifyAll(); 983 } 984 985 public boolean isGo() { 986 return go; 987 } 988 989 /** 990 * Used for the test. 991 */ 992 boolean isEnteringRun() { 993 return this.enteringRun; 994 } 995 } 996 997 /* 998 * Statistics thread. Periodically prints the cache statistics to the log. 999 */ 1000 static class StatisticsThread extends Thread { 1001 1002 private final LruBlockCache lru; 1003 1004 public StatisticsThread(LruBlockCache lru) { 1005 super("LruBlockCacheStats"); 1006 setDaemon(true); 1007 this.lru = lru; 1008 } 1009 1010 @Override 1011 public void run() { 1012 lru.logStats(); 1013 } 1014 } 1015 1016 public void logStats() { 1017 // Log size 1018 long usedSize = heapSize(); 1019 long freeSize = maxSize - usedSize; 1020 LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(maxSize) + ", " + "usedSize=" 1021 + StringUtils.byteDesc(usedSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 1022 + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + "blockCount=" + getBlockCount() + ", " 1023 + "accesses=" + stats.getRequestCount() + ", " + "hits=" + stats.getHitCount() + ", " 1024 + "hitRatio=" 1025 + (stats.getHitCount() == 0 1026 ? "0" 1027 : (StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ")) 1028 + ", " + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + "cachingHits=" 1029 + stats.getHitCachingCount() + ", " + "cachingHitsRatio=" 1030 + (stats.getHitCachingCount() == 0 1031 ? "0," 1032 : (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) 1033 + "evictions=" + stats.getEvictionCount() + ", " + "evicted=" + stats.getEvictedCount() + ", " 1034 + "evictedPerRun=" + stats.evictedPerEviction()); 1035 } 1036 1037 /** 1038 * Get counter statistics for this cache. 1039 * <p> 1040 * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. 1041 */ 1042 @Override 1043 public CacheStats getStats() { 1044 return this.stats; 1045 } 1046 1047 public final static long CACHE_FIXED_OVERHEAD = 1048 ClassSize.estimateBase(LruBlockCache.class, false); 1049 1050 @Override 1051 public long heapSize() { 1052 return getCurrentSize(); 1053 } 1054 1055 private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { 1056 // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG 1057 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP 1058 + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) 1059 + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); 1060 } 1061 1062 @Override 1063 public Iterator<CachedBlock> iterator() { 1064 final Iterator<LruCachedBlock> iterator = map.values().iterator(); 1065 1066 return new Iterator<CachedBlock>() { 1067 private final long now = System.nanoTime(); 1068 1069 @Override 1070 public boolean hasNext() { 1071 return iterator.hasNext(); 1072 } 1073 1074 @Override 1075 public CachedBlock next() { 1076 final LruCachedBlock b = iterator.next(); 1077 return new CachedBlock() { 1078 @Override 1079 public String toString() { 1080 return BlockCacheUtil.toString(this, now); 1081 } 1082 1083 @Override 1084 public BlockPriority getBlockPriority() { 1085 return b.getPriority(); 1086 } 1087 1088 @Override 1089 public BlockType getBlockType() { 1090 return b.getBuffer().getBlockType(); 1091 } 1092 1093 @Override 1094 public long getOffset() { 1095 return b.getCacheKey().getOffset(); 1096 } 1097 1098 @Override 1099 public long getSize() { 1100 return b.getBuffer().heapSize(); 1101 } 1102 1103 @Override 1104 public long getCachedTime() { 1105 return b.getCachedTime(); 1106 } 1107 1108 @Override 1109 public String getFilename() { 1110 return b.getCacheKey().getHfileName(); 1111 } 1112 1113 @Override 1114 public int compareTo(CachedBlock other) { 1115 int diff = this.getFilename().compareTo(other.getFilename()); 1116 if (diff != 0) { 1117 return diff; 1118 } 1119 diff = Long.compare(this.getOffset(), other.getOffset()); 1120 if (diff != 0) { 1121 return diff; 1122 } 1123 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1124 throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); 1125 } 1126 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1127 } 1128 1129 @Override 1130 public int hashCode() { 1131 return b.hashCode(); 1132 } 1133 1134 @Override 1135 public boolean equals(Object obj) { 1136 if (obj instanceof CachedBlock) { 1137 CachedBlock cb = (CachedBlock) obj; 1138 return compareTo(cb) == 0; 1139 } else { 1140 return false; 1141 } 1142 } 1143 }; 1144 } 1145 1146 @Override 1147 public void remove() { 1148 throw new UnsupportedOperationException(); 1149 } 1150 }; 1151 } 1152 1153 // Simple calculators of sizes given factors and maxSize 1154 1155 long acceptableSize() { 1156 return (long) Math.floor(this.maxSize * this.acceptableFactor); 1157 } 1158 1159 private long minSize() { 1160 return (long) Math.floor(this.maxSize * this.minFactor); 1161 } 1162 1163 private long singleSize() { 1164 return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor); 1165 } 1166 1167 private long multiSize() { 1168 return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor); 1169 } 1170 1171 private long memorySize() { 1172 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); 1173 } 1174 1175 @Override 1176 public void shutdown() { 1177 if (victimHandler != null) { 1178 victimHandler.shutdown(); 1179 } 1180 this.scheduleThreadPool.shutdown(); 1181 for (int i = 0; i < 10; i++) { 1182 if (!this.scheduleThreadPool.isShutdown()) { 1183 try { 1184 Thread.sleep(10); 1185 } catch (InterruptedException e) { 1186 LOG.warn("Interrupted while sleeping"); 1187 Thread.currentThread().interrupt(); 1188 break; 1189 } 1190 } 1191 } 1192 1193 if (!this.scheduleThreadPool.isShutdown()) { 1194 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 1195 LOG.debug("Still running " + runnables); 1196 } 1197 this.evictionThread.shutdown(); 1198 } 1199 1200 /** Clears the cache. Used in tests. */ 1201 public void clearCache() { 1202 this.map.clear(); 1203 this.elements.set(0); 1204 } 1205 1206 /** 1207 * Used in testing. May be very inefficient. 1208 * @return the set of cached file names 1209 */ 1210 SortedSet<String> getCachedFileNamesForTest() { 1211 SortedSet<String> fileNames = new TreeSet<>(); 1212 for (BlockCacheKey cacheKey : map.keySet()) { 1213 fileNames.add(cacheKey.getHfileName()); 1214 } 1215 return fileNames; 1216 } 1217 1218 public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { 1219 Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class); 1220 for (LruCachedBlock block : map.values()) { 1221 DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); 1222 Integer count = counts.get(encoding); 1223 counts.put(encoding, (count == null ? 0 : count) + 1); 1224 } 1225 return counts; 1226 } 1227 1228 Map<BlockCacheKey, LruCachedBlock> getMapForTests() { 1229 return map; 1230 } 1231 1232 @Override 1233 public BlockCache[] getBlockCaches() { 1234 if (victimHandler != null) { 1235 return new BlockCache[] { this, this.victimHandler }; 1236 } 1237 return null; 1238 } 1239}