001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableSet; 035import java.util.Optional; 036import java.util.PriorityQueue; 037import java.util.Set; 038import java.util.concurrent.ArrayBlockingQueue; 039import java.util.concurrent.BlockingQueue; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ConcurrentSkipListSet; 043import java.util.concurrent.Executors; 044import java.util.concurrent.ScheduledExecutorService; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.atomic.LongAdder; 049import java.util.concurrent.locks.Lock; 050import java.util.concurrent.locks.ReentrantLock; 051import java.util.concurrent.locks.ReentrantReadWriteLock; 052import java.util.function.Consumer; 053import java.util.function.Function; 054import org.apache.commons.io.IOUtils; 055import org.apache.commons.lang3.mutable.MutableInt; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseIOException; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.client.Admin; 062import org.apache.hadoop.hbase.io.ByteBuffAllocator; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 064import org.apache.hadoop.hbase.io.HeapSize; 065import org.apache.hadoop.hbase.io.hfile.BlockCache; 066import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 067import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 068import org.apache.hadoop.hbase.io.hfile.BlockPriority; 069import org.apache.hadoop.hbase.io.hfile.BlockType; 070import org.apache.hadoop.hbase.io.hfile.CacheConfig; 071import org.apache.hadoop.hbase.io.hfile.CacheStats; 072import org.apache.hadoop.hbase.io.hfile.Cacheable; 073import org.apache.hadoop.hbase.io.hfile.CachedBlock; 074import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 075import org.apache.hadoop.hbase.io.hfile.HFileBlock; 076import org.apache.hadoop.hbase.io.hfile.HFileContext; 077import org.apache.hadoop.hbase.nio.ByteBuff; 078import org.apache.hadoop.hbase.nio.RefCnt; 079import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 080import org.apache.hadoop.hbase.util.Bytes; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.IdReadWriteLock; 083import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; 084import org.apache.hadoop.hbase.util.Pair; 085import org.apache.hadoop.util.StringUtils; 086import org.apache.yetus.audience.InterfaceAudience; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 091import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 092 093import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 094 095/** 096 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 097 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 098 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 099 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 100 * {@link FileIOEngine} to store/read the block data. 101 * <p> 102 * Eviction is via a similar algorithm as used in 103 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 104 * <p> 105 * BucketCache can be used as mainly a block cache (see 106 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 107 * decrease CMS GC and heap fragmentation. 108 * <p> 109 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 110 * enlarge cache space via a victim cache. 111 */ 112@InterfaceAudience.Private 113public class BucketCache implements BlockCache, HeapSize { 114 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 115 116 /** Priority buckets config */ 117 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 118 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 119 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 120 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 121 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 122 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 123 124 /** Priority buckets */ 125 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 126 static final float DEFAULT_MULTI_FACTOR = 0.50f; 127 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 128 static final float DEFAULT_MIN_FACTOR = 0.85f; 129 130 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 131 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 132 133 // Number of blocks to clear for each of the bucket size that is full 134 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 135 136 /** Statistics thread */ 137 private static final int statThreadPeriod = 5 * 60; 138 139 final static int DEFAULT_WRITER_THREADS = 3; 140 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 141 142 // Store/read block data 143 transient final IOEngine ioEngine; 144 145 // Store the block in this map before writing it to cache 146 transient final RAMCache ramCache; 147 148 // In this map, store the block's meta data like offset, length 149 transient Map<BlockCacheKey, BucketEntry> backingMap; 150 151 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 152 153 /** 154 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 155 * together with the region those belong to and the total cached size for the 156 * region.TestBlockEvictionOnRegionMovement 157 */ 158 final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 159 /** 160 * Map of region -> total size of the region prefetched on this region server. This is the total 161 * size of hFiles for this region prefetched on this region server 162 */ 163 final Map<String, Long> regionCachedSizeMap = new ConcurrentHashMap<>(); 164 165 private BucketCachePersister cachePersister; 166 167 /** 168 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 169 * that Bucket IO exceptions/errors don't bring down the HBase server. 170 */ 171 private volatile boolean cacheEnabled; 172 173 /** 174 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 175 * words, the work adding blocks to the BucketCache is divided up amongst the running 176 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 177 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 178 * then updates the ramCache and backingMap accordingly. 179 */ 180 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 181 transient final WriterThread[] writerThreads; 182 183 /** Volatile boolean to track if free space is in process or not */ 184 private volatile boolean freeInProgress = false; 185 private transient final Lock freeSpaceLock = new ReentrantLock(); 186 187 private final LongAdder realCacheSize = new LongAdder(); 188 private final LongAdder heapSize = new LongAdder(); 189 /** Current number of cached elements */ 190 private final LongAdder blockNumber = new LongAdder(); 191 192 /** Cache access count (sequential ID) */ 193 private final AtomicLong accessCount = new AtomicLong(); 194 195 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 196 197 private final BucketCacheStats cacheStats = new BucketCacheStats(); 198 private final String persistencePath; 199 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 200 private final long cacheCapacity; 201 /** Approximate block size */ 202 private final long blockSize; 203 204 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 205 private final int ioErrorsTolerationDuration; 206 // 1 min 207 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 208 209 // Start time of first IO error when reading or writing IO Engine, it will be 210 // reset after a successful read/write. 211 private volatile long ioErrorStartTime = -1; 212 213 /** 214 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 215 * this is to avoid freeing the block which is being read. 216 * <p> 217 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. 218 */ 219 transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT); 220 221 NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 222 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 223 224 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 225 private transient final ScheduledExecutorService scheduleThreadPool = 226 Executors.newScheduledThreadPool(1, 227 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 228 229 // Allocate or free space for the block 230 private transient BucketAllocator bucketAllocator; 231 232 /** Acceptable size of cache (no evictions if size < acceptable) */ 233 private float acceptableFactor; 234 235 /** Minimum threshold of cache (when evicting, evict until size < min) */ 236 private float minFactor; 237 238 /** 239 * Free this floating point factor of extra blocks when evicting. For example free the number of 240 * blocks requested * (1 + extraFreeFactor) 241 */ 242 private float extraFreeFactor; 243 244 /** Single access bucket size */ 245 private float singleFactor; 246 247 /** Multiple access bucket size */ 248 private float multiFactor; 249 250 /** In-memory bucket size */ 251 private float memoryFactor; 252 253 private long bucketcachePersistInterval; 254 255 private static final String FILE_VERIFY_ALGORITHM = 256 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 257 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 258 259 private static final String QUEUE_ADDITION_WAIT_TIME = 260 "hbase.bucketcache.queue.addition.waittime"; 261 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 262 private long queueAdditionWaitTime; 263 /** 264 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 265 * integrity, default algorithm is MD5 266 */ 267 private String algorithm; 268 269 /* Tracing failed Bucket Cache allocations. */ 270 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 271 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 272 273 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 274 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 275 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 276 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 277 } 278 279 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 280 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 281 Configuration conf) throws IOException { 282 Preconditions.checkArgument(blockSize > 0, 283 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 284 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 285 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 286 this.writerThreads = new WriterThread[writerThreadNum]; 287 long blockNumCapacity = capacity / blockSize; 288 if (blockNumCapacity >= Integer.MAX_VALUE) { 289 // Enough for about 32TB of cache! 290 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 291 } 292 293 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 294 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 295 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 296 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 297 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 298 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 299 this.queueAdditionWaitTime = 300 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 301 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 302 303 sanityCheckConfigs(); 304 305 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 306 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 307 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor); 308 309 this.cacheCapacity = capacity; 310 this.persistencePath = persistencePath; 311 this.blockSize = blockSize; 312 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 313 314 this.allocFailLogPrevTs = 0; 315 316 for (int i = 0; i < writerThreads.length; ++i) { 317 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 318 } 319 320 assert writerQueues.size() == writerThreads.length; 321 this.ramCache = new RAMCache(); 322 323 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 324 325 if (isCachePersistent()) { 326 if (ioEngine instanceof FileIOEngine) { 327 startBucketCachePersisterThread(); 328 } 329 try { 330 retrieveFromFile(bucketSizes); 331 } catch (IOException ioex) { 332 LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex); 333 backingMap.clear(); 334 fullyCachedFiles.clear(); 335 backingMapValidated.set(true); 336 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 337 regionCachedSizeMap.clear(); 338 } 339 } else { 340 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 341 } 342 final String threadName = Thread.currentThread().getName(); 343 this.cacheEnabled = true; 344 for (int i = 0; i < writerThreads.length; ++i) { 345 writerThreads[i] = new WriterThread(writerQueues.get(i)); 346 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 347 writerThreads[i].setDaemon(true); 348 } 349 startWriterThreads(); 350 351 // Run the statistics thread periodically to print the cache statistics log 352 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 353 // every five minutes. 354 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 355 statThreadPeriod, TimeUnit.SECONDS); 356 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 357 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 358 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 359 + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 360 } 361 362 private void sanityCheckConfigs() { 363 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 364 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 365 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 366 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 367 Preconditions.checkArgument(minFactor <= acceptableFactor, 368 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 369 Preconditions.checkArgument(extraFreeFactor >= 0, 370 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 371 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 372 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 373 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 374 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 375 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 376 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 377 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 378 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 379 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 380 } 381 382 /** 383 * Called by the constructor to start the writer threads. Used by tests that need to override 384 * starting the threads. 385 */ 386 protected void startWriterThreads() { 387 for (WriterThread thread : writerThreads) { 388 thread.start(); 389 } 390 } 391 392 void startBucketCachePersisterThread() { 393 LOG.info("Starting BucketCachePersisterThread"); 394 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 395 cachePersister.setDaemon(true); 396 cachePersister.start(); 397 } 398 399 boolean isCacheEnabled() { 400 return this.cacheEnabled; 401 } 402 403 @Override 404 public long getMaxSize() { 405 return this.cacheCapacity; 406 } 407 408 public String getIoEngine() { 409 return ioEngine.toString(); 410 } 411 412 /** 413 * Get the IOEngine from the IO engine name 414 * @return the IOEngine 415 */ 416 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 417 throws IOException { 418 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 419 // In order to make the usage simple, we only need the prefix 'files:' in 420 // document whether one or multiple file(s), but also support 'file:' for 421 // the compatibility 422 String[] filePaths = 423 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 424 return new FileIOEngine(capacity, persistencePath != null, filePaths); 425 } else if (ioEngineName.startsWith("offheap")) { 426 return new ByteBufferIOEngine(capacity); 427 } else if (ioEngineName.startsWith("mmap:")) { 428 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 429 } else if (ioEngineName.startsWith("pmem:")) { 430 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 431 // device. Since the persistent memory device has its own address space the contents 432 // mapped to this address space does not get swapped out like in the case of mmapping 433 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 434 // can be directly referred to without having to copy them onheap. Once the RPC is done, 435 // the blocks can be returned back as in case of ByteBufferIOEngine. 436 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 437 } else { 438 throw new IllegalArgumentException( 439 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 440 } 441 } 442 443 public boolean isCachePersistenceEnabled() { 444 return persistencePath != null; 445 } 446 447 /** 448 * Cache the block with the specified name and buffer. 449 * @param cacheKey block's cache key 450 * @param buf block buffer 451 */ 452 @Override 453 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 454 cacheBlock(cacheKey, buf, false); 455 } 456 457 /** 458 * Cache the block with the specified name and buffer. 459 * @param cacheKey block's cache key 460 * @param cachedItem block buffer 461 * @param inMemory if block is in-memory 462 */ 463 @Override 464 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 465 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 466 } 467 468 /** 469 * Cache the block with the specified name and buffer. 470 * @param cacheKey block's cache key 471 * @param cachedItem block buffer 472 * @param inMemory if block is in-memory 473 */ 474 @Override 475 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 476 boolean waitWhenCache) { 477 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 478 } 479 480 /** 481 * Cache the block to ramCache 482 * @param cacheKey block's cache key 483 * @param cachedItem block buffer 484 * @param inMemory if block is in-memory 485 * @param wait if true, blocking wait when queue is full 486 */ 487 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 488 boolean wait) { 489 if (cacheEnabled) { 490 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 491 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 492 BucketEntry bucketEntry = backingMap.get(cacheKey); 493 if (bucketEntry != null && bucketEntry.isRpcRef()) { 494 // avoid replace when there are RPC refs for the bucket entry in bucket cache 495 return; 496 } 497 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 498 } 499 } else { 500 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 501 } 502 } 503 } 504 505 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 506 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 507 } 508 509 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 510 boolean inMemory, boolean wait) { 511 if (!cacheEnabled) { 512 return; 513 } 514 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 515 cacheKey.setBlockType(cachedItem.getBlockType()); 516 } 517 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 518 // Stuff the entry into the RAM cache so it can get drained to the persistent store 519 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 520 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine); 521 /** 522 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 523 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 524 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 525 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 526 * one, then the heap size will mess up (HBASE-20789) 527 */ 528 if (ramCache.putIfAbsent(cacheKey, re) != null) { 529 return; 530 } 531 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 532 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 533 boolean successfulAddition = false; 534 if (wait) { 535 try { 536 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 537 } catch (InterruptedException e) { 538 Thread.currentThread().interrupt(); 539 } 540 } else { 541 successfulAddition = bq.offer(re); 542 } 543 if (!successfulAddition) { 544 ramCache.remove(cacheKey); 545 cacheStats.failInsert(); 546 } else { 547 this.blockNumber.increment(); 548 this.heapSize.add(cachedItem.heapSize()); 549 } 550 } 551 552 /** 553 * Get the buffer of the block with the specified key. 554 * @param key block's cache key 555 * @param caching true if the caller caches blocks on cache misses 556 * @param repeat Whether this is a repeat lookup for the same block 557 * @param updateCacheMetrics Whether we should update cache metrics or not 558 * @return buffer of specified cache key, or null if not in cache 559 */ 560 @Override 561 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 562 boolean updateCacheMetrics) { 563 if (!cacheEnabled) { 564 return null; 565 } 566 RAMQueueEntry re = ramCache.get(key); 567 if (re != null) { 568 if (updateCacheMetrics) { 569 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 570 } 571 re.access(accessCount.incrementAndGet()); 572 return re.getData(); 573 } 574 BucketEntry bucketEntry = backingMap.get(key); 575 if (bucketEntry != null) { 576 long start = System.nanoTime(); 577 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 578 try { 579 lock.readLock().lock(); 580 // We can not read here even if backingMap does contain the given key because its offset 581 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 582 // existence here. 583 if (bucketEntry.equals(backingMap.get(key))) { 584 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 585 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 586 // the same BucketEntry, then all of the three will share the same refCnt. 587 Cacheable cachedBlock = ioEngine.read(bucketEntry); 588 if (ioEngine.usesSharedMemory()) { 589 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 590 // same RefCnt, do retain here, in order to count the number of RPC references 591 cachedBlock.retain(); 592 } 593 // Update the cache statistics. 594 if (updateCacheMetrics) { 595 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 596 cacheStats.ioHit(System.nanoTime() - start); 597 } 598 bucketEntry.access(accessCount.incrementAndGet()); 599 if (this.ioErrorStartTime > 0) { 600 ioErrorStartTime = -1; 601 } 602 return cachedBlock; 603 } 604 } catch (HBaseIOException hioex) { 605 // When using file io engine persistent cache, 606 // the cache map state might differ from the actual cache. If we reach this block, 607 // we should remove the cache key entry from the backing map 608 backingMap.remove(key); 609 removeFileFromPrefetch(key.getHfileName()); 610 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 611 } catch (IOException ioex) { 612 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 613 checkIOErrorIsTolerated(); 614 } finally { 615 lock.readLock().unlock(); 616 } 617 } 618 if (!repeat && updateCacheMetrics) { 619 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 620 } 621 return null; 622 } 623 624 /** 625 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 626 */ 627 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 628 boolean evictedByEvictionProcess) { 629 bucketEntry.markAsEvicted(); 630 blocksByHFile.remove(cacheKey); 631 if (decrementBlockNumber) { 632 this.blockNumber.decrement(); 633 if (ioEngine.isPersistent()) { 634 removeFileFromPrefetch(cacheKey.getHfileName()); 635 } 636 } 637 if (evictedByEvictionProcess) { 638 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 639 } 640 if (ioEngine.isPersistent()) { 641 setCacheInconsistent(true); 642 } 643 } 644 645 /** 646 * Free the {{@link BucketEntry} actually,which could only be invoked when the 647 * {@link BucketEntry#refCnt} becoming 0. 648 */ 649 void freeBucketEntry(BucketEntry bucketEntry) { 650 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 651 realCacheSize.add(-1 * bucketEntry.getLength()); 652 } 653 654 /** 655 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 656 * 1. Close an HFile, and clear all cached blocks. <br> 657 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 658 * <p> 659 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 660 * Here we evict the block from backingMap immediately, but only free the reference from bucket 661 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 662 * block, block can only be de-allocated when all of them release the block. 663 * <p> 664 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 665 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 666 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 667 * @param cacheKey Block to evict 668 * @return true to indicate whether we've evicted successfully or not. 669 */ 670 @Override 671 public boolean evictBlock(BlockCacheKey cacheKey) { 672 return doEvictBlock(cacheKey, null, false); 673 } 674 675 /** 676 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 677 * {@link BucketCache#ramCache}. <br/> 678 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 679 * {@link BucketEntry} could be removed. 680 * @param cacheKey {@link BlockCacheKey} to evict. 681 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 682 * @return true to indicate whether we've evicted successfully or not. 683 */ 684 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 685 boolean evictedByEvictionProcess) { 686 if (!cacheEnabled) { 687 return false; 688 } 689 boolean existedInRamCache = removeFromRamCache(cacheKey); 690 if (bucketEntry == null) { 691 bucketEntry = backingMap.get(cacheKey); 692 } 693 final BucketEntry bucketEntryToUse = bucketEntry; 694 695 if (bucketEntryToUse == null) { 696 if (existedInRamCache && evictedByEvictionProcess) { 697 cacheStats.evicted(0, cacheKey.isPrimary()); 698 } 699 return existedInRamCache; 700 } else { 701 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 702 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 703 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 704 cacheKey, bucketEntryToUse.offset()); 705 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 706 return true; 707 } 708 return false; 709 }); 710 } 711 } 712 713 /** 714 * <pre> 715 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 716 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 717 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 718 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 719 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 720 * it is the return value of current {@link BucketCache#createRecycler} method. 721 * 722 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 723 * it is {@link ByteBuffAllocator#putbackBuffer}. 724 * </pre> 725 */ 726 private Recycler createRecycler(final BucketEntry bucketEntry) { 727 return () -> { 728 freeBucketEntry(bucketEntry); 729 return; 730 }; 731 } 732 733 /** 734 * NOTE: This method is only for test. 735 */ 736 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 737 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 738 if (bucketEntry == null) { 739 return false; 740 } 741 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 742 } 743 744 /** 745 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 746 * {@link BucketEntry#isRpcRef} is false. <br/> 747 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 748 * {@link BucketEntry} could be removed. 749 * @param blockCacheKey {@link BlockCacheKey} to evict. 750 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 751 * @return true to indicate whether we've evicted successfully or not. 752 */ 753 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 754 if (!bucketEntry.isRpcRef()) { 755 return doEvictBlock(blockCacheKey, bucketEntry, true); 756 } 757 return false; 758 } 759 760 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 761 return ramCache.remove(cacheKey, re -> { 762 if (re != null) { 763 this.blockNumber.decrement(); 764 this.heapSize.add(-1 * re.getData().heapSize()); 765 } 766 }); 767 } 768 769 public boolean isCacheInconsistent() { 770 return isCacheInconsistent.get(); 771 } 772 773 public void setCacheInconsistent(boolean setCacheInconsistent) { 774 isCacheInconsistent.set(setCacheInconsistent); 775 } 776 777 /* 778 * Statistics thread. Periodically output cache statistics to the log. 779 */ 780 private static class StatisticsThread extends Thread { 781 private final BucketCache bucketCache; 782 783 public StatisticsThread(BucketCache bucketCache) { 784 super("BucketCacheStatsThread"); 785 setDaemon(true); 786 this.bucketCache = bucketCache; 787 } 788 789 @Override 790 public void run() { 791 bucketCache.logStats(); 792 } 793 } 794 795 public void logStats() { 796 long totalSize = bucketAllocator.getTotalSize(); 797 long usedSize = bucketAllocator.getUsedSize(); 798 long freeSize = totalSize - usedSize; 799 long cacheSize = getRealCacheSize(); 800 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 801 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 802 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 803 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 804 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 805 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 806 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 807 + (cacheStats.getHitCount() == 0 808 ? "0," 809 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 810 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 811 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 812 + (cacheStats.getHitCachingCount() == 0 813 ? "0," 814 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 815 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 816 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 817 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount()); 818 cacheStats.reset(); 819 820 bucketAllocator.logDebugStatistics(); 821 } 822 823 public long getRealCacheSize() { 824 return this.realCacheSize.sum(); 825 } 826 827 public long acceptableSize() { 828 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 829 } 830 831 long getPartitionSize(float partitionFactor) { 832 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 833 } 834 835 /** 836 * Return the count of bucketSizeinfos still need free space 837 */ 838 private int bucketSizesAboveThresholdCount(float minFactor) { 839 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 840 int fullCount = 0; 841 for (int i = 0; i < stats.length; i++) { 842 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 843 freeGoal = Math.max(freeGoal, 1); 844 if (stats[i].freeCount() < freeGoal) { 845 fullCount++; 846 } 847 } 848 return fullCount; 849 } 850 851 /** 852 * This method will find the buckets that are minimally occupied and are not reference counted and 853 * will free them completely without any constraint on the access times of the elements, and as a 854 * process will completely free at most the number of buckets passed, sometimes it might not due 855 * to changing refCounts 856 * @param completelyFreeBucketsNeeded number of buckets to free 857 **/ 858 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 859 if (completelyFreeBucketsNeeded != 0) { 860 // First we will build a set where the offsets are reference counted, usually 861 // this set is small around O(Handler Count) unless something else is wrong 862 Set<Integer> inUseBuckets = new HashSet<>(); 863 backingMap.forEach((k, be) -> { 864 if (be.isRpcRef()) { 865 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 866 } 867 }); 868 Set<Integer> candidateBuckets = 869 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 870 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 871 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 872 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 873 } 874 } 875 } 876 } 877 878 /** 879 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 880 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 881 * blocks evicted 882 * @param why Why we are being called 883 */ 884 void freeSpace(final String why) { 885 // Ensure only one freeSpace progress at a time 886 if (!freeSpaceLock.tryLock()) { 887 return; 888 } 889 try { 890 freeInProgress = true; 891 long bytesToFreeWithoutExtra = 0; 892 // Calculate free byte for each bucketSizeinfo 893 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 894 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 895 long[] bytesToFreeForBucket = new long[stats.length]; 896 for (int i = 0; i < stats.length; i++) { 897 bytesToFreeForBucket[i] = 0; 898 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 899 freeGoal = Math.max(freeGoal, 1); 900 if (stats[i].freeCount() < freeGoal) { 901 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 902 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 903 if (msgBuffer != null) { 904 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 905 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 906 } 907 } 908 } 909 if (msgBuffer != null) { 910 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 911 } 912 913 if (bytesToFreeWithoutExtra <= 0) { 914 return; 915 } 916 long currentSize = bucketAllocator.getUsedSize(); 917 long totalSize = bucketAllocator.getTotalSize(); 918 if (LOG.isDebugEnabled() && msgBuffer != null) { 919 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() 920 + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 921 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 922 + StringUtils.byteDesc(totalSize)); 923 } 924 925 long bytesToFreeWithExtra = 926 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 927 928 // Instantiate priority buckets 929 BucketEntryGroup bucketSingle = 930 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 931 BucketEntryGroup bucketMulti = 932 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 933 BucketEntryGroup bucketMemory = 934 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 935 936 // Scan entire map putting bucket entry into appropriate bucket entry 937 // group 938 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 939 switch (bucketEntryWithKey.getValue().getPriority()) { 940 case SINGLE: { 941 bucketSingle.add(bucketEntryWithKey); 942 break; 943 } 944 case MULTI: { 945 bucketMulti.add(bucketEntryWithKey); 946 break; 947 } 948 case MEMORY: { 949 bucketMemory.add(bucketEntryWithKey); 950 break; 951 } 952 } 953 } 954 955 PriorityQueue<BucketEntryGroup> bucketQueue = 956 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 957 958 bucketQueue.add(bucketSingle); 959 bucketQueue.add(bucketMulti); 960 bucketQueue.add(bucketMemory); 961 962 int remainingBuckets = bucketQueue.size(); 963 long bytesFreed = 0; 964 965 BucketEntryGroup bucketGroup; 966 while ((bucketGroup = bucketQueue.poll()) != null) { 967 long overflow = bucketGroup.overflow(); 968 if (overflow > 0) { 969 long bucketBytesToFree = 970 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 971 bytesFreed += bucketGroup.free(bucketBytesToFree); 972 } 973 remainingBuckets--; 974 } 975 976 // Check and free if there are buckets that still need freeing of space 977 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 978 bucketQueue.clear(); 979 remainingBuckets = 3; 980 981 bucketQueue.add(bucketSingle); 982 bucketQueue.add(bucketMulti); 983 bucketQueue.add(bucketMemory); 984 985 while ((bucketGroup = bucketQueue.poll()) != null) { 986 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 987 bytesFreed += bucketGroup.free(bucketBytesToFree); 988 remainingBuckets--; 989 } 990 } 991 992 // Even after the above free we might still need freeing because of the 993 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 994 // there might be some buckets where the occupancy is very sparse and thus are not 995 // yielding the free for the other bucket sizes, the fix for this to evict some 996 // of the buckets, we do this by evicting the buckets that are least fulled 997 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 998 999 if (LOG.isDebugEnabled()) { 1000 long single = bucketSingle.totalSize(); 1001 long multi = bucketMulti.totalSize(); 1002 long memory = bucketMemory.totalSize(); 1003 if (LOG.isDebugEnabled()) { 1004 LOG.debug("Bucket cache free space completed; " + "freed=" 1005 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1006 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1007 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1008 } 1009 } 1010 1011 } catch (Throwable t) { 1012 LOG.warn("Failed freeing space", t); 1013 } finally { 1014 cacheStats.evict(); 1015 freeInProgress = false; 1016 freeSpaceLock.unlock(); 1017 } 1018 } 1019 1020 // This handles flushing the RAM cache to IOEngine. 1021 class WriterThread extends Thread { 1022 private final BlockingQueue<RAMQueueEntry> inputQueue; 1023 private volatile boolean writerEnabled = true; 1024 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1025 1026 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1027 super("BucketCacheWriterThread"); 1028 this.inputQueue = queue; 1029 } 1030 1031 // Used for test 1032 void disableWriter() { 1033 this.writerEnabled = false; 1034 } 1035 1036 @Override 1037 public void run() { 1038 List<RAMQueueEntry> entries = new ArrayList<>(); 1039 try { 1040 while (cacheEnabled && writerEnabled) { 1041 try { 1042 try { 1043 // Blocks 1044 entries = getRAMQueueEntries(inputQueue, entries); 1045 } catch (InterruptedException ie) { 1046 if (!cacheEnabled || !writerEnabled) { 1047 break; 1048 } 1049 } 1050 doDrain(entries, metaBuff); 1051 } catch (Exception ioe) { 1052 LOG.error("WriterThread encountered error", ioe); 1053 } 1054 } 1055 } catch (Throwable t) { 1056 LOG.warn("Failed doing drain", t); 1057 } 1058 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 1059 } 1060 } 1061 1062 /** 1063 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1064 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1065 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1066 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1067 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1068 * bucketAllocator do not free its memory. 1069 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1070 * cacheKey, Cacheable newBlock) 1071 * @param key Block cache key 1072 * @param bucketEntry Bucket entry to put into backingMap. 1073 */ 1074 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1075 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1076 blocksByHFile.add(key); 1077 if (previousEntry != null && previousEntry != bucketEntry) { 1078 previousEntry.withWriteLock(offsetLock, () -> { 1079 blockEvicted(key, previousEntry, false, false); 1080 return null; 1081 }); 1082 } 1083 } 1084 1085 /** 1086 * Prepare and return a warning message for Bucket Allocator Exception 1087 * @param fle The exception 1088 * @param re The RAMQueueEntry for which the exception was thrown. 1089 * @return A warning message created from the input RAMQueueEntry object. 1090 */ 1091 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1092 final RAMQueueEntry re) { 1093 final StringBuilder sb = new StringBuilder(); 1094 sb.append("Most recent failed allocation after "); 1095 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1096 sb.append(" ms;"); 1097 if (re != null) { 1098 if (re.getData() instanceof HFileBlock) { 1099 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1100 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1101 final String tableName = Bytes.toString(fileContext.getTableName()); 1102 if (tableName != null) { 1103 sb.append(" Table: "); 1104 sb.append(tableName); 1105 } 1106 if (columnFamily != null) { 1107 sb.append(" CF: "); 1108 sb.append(columnFamily); 1109 } 1110 sb.append(" HFile: "); 1111 if (fileContext.getHFileName() != null) { 1112 sb.append(fileContext.getHFileName()); 1113 } else { 1114 sb.append(re.getKey()); 1115 } 1116 } else { 1117 sb.append(" HFile: "); 1118 sb.append(re.getKey()); 1119 } 1120 } 1121 sb.append(" Message: "); 1122 sb.append(fle.getMessage()); 1123 return sb.toString(); 1124 } 1125 1126 /** 1127 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1128 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1129 * references and we'll OOME. 1130 * @param entries Presumes list passed in here will be processed by this invocation only. No 1131 * interference expected. 1132 */ 1133 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1134 if (entries.isEmpty()) { 1135 return; 1136 } 1137 // This method is a little hard to follow. We run through the passed in entries and for each 1138 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1139 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1140 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1141 // filling ramCache. We do the clean up by again running through the passed in entries 1142 // doing extra work when we find a non-null bucketEntries corresponding entry. 1143 final int size = entries.size(); 1144 BucketEntry[] bucketEntries = new BucketEntry[size]; 1145 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1146 // when we go to add an entry by going around the loop again without upping the index. 1147 int index = 0; 1148 while (cacheEnabled && index < size) { 1149 RAMQueueEntry re = null; 1150 try { 1151 re = entries.get(index); 1152 if (re == null) { 1153 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1154 index++; 1155 continue; 1156 } 1157 // Reset the position for reuse. 1158 // It should be guaranteed that the data in the metaBuff has been transferred to the 1159 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1160 // transferred with our current IOEngines. Should take care, when we have new kinds of 1161 // IOEngine in the future. 1162 metaBuff.clear(); 1163 BucketEntry bucketEntry = 1164 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1165 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1166 bucketEntries[index] = bucketEntry; 1167 if (ioErrorStartTime > 0) { 1168 ioErrorStartTime = -1; 1169 } 1170 index++; 1171 } catch (BucketAllocatorException fle) { 1172 long currTs = EnvironmentEdgeManager.currentTime(); 1173 cacheStats.allocationFailed(); // Record the warning. 1174 if ( 1175 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1176 ) { 1177 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1178 allocFailLogPrevTs = currTs; 1179 } 1180 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1181 bucketEntries[index] = null; 1182 index++; 1183 } catch (CacheFullException cfe) { 1184 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1185 if (!freeInProgress) { 1186 freeSpace("Full!"); 1187 } else { 1188 Thread.sleep(50); 1189 } 1190 } catch (IOException ioex) { 1191 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1192 LOG.error("Failed writing to bucket cache", ioex); 1193 checkIOErrorIsTolerated(); 1194 } 1195 } 1196 1197 // Make sure data pages are written on media before we update maps. 1198 try { 1199 ioEngine.sync(); 1200 } catch (IOException ioex) { 1201 LOG.error("Failed syncing IO engine", ioex); 1202 checkIOErrorIsTolerated(); 1203 // Since we failed sync, free the blocks in bucket allocator 1204 for (int i = 0; i < entries.size(); ++i) { 1205 BucketEntry bucketEntry = bucketEntries[i]; 1206 if (bucketEntry != null) { 1207 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1208 bucketEntries[i] = null; 1209 } 1210 } 1211 } 1212 1213 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1214 // success or error. 1215 for (int i = 0; i < size; ++i) { 1216 BlockCacheKey key = entries.get(i).getKey(); 1217 // Only add if non-null entry. 1218 if (bucketEntries[i] != null) { 1219 putIntoBackingMap(key, bucketEntries[i]); 1220 if (ioEngine.isPersistent()) { 1221 setCacheInconsistent(true); 1222 } 1223 } 1224 // Always remove from ramCache even if we failed adding it to the block cache above. 1225 boolean existed = ramCache.remove(key, re -> { 1226 if (re != null) { 1227 heapSize.add(-1 * re.getData().heapSize()); 1228 } 1229 }); 1230 if (!existed && bucketEntries[i] != null) { 1231 // Block should have already been evicted. Remove it and free space. 1232 final BucketEntry bucketEntry = bucketEntries[i]; 1233 bucketEntry.withWriteLock(offsetLock, () -> { 1234 if (backingMap.remove(key, bucketEntry)) { 1235 blockEvicted(key, bucketEntry, false, false); 1236 } 1237 return null; 1238 }); 1239 } 1240 } 1241 1242 long used = bucketAllocator.getUsedSize(); 1243 if (used > acceptableSize()) { 1244 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1245 } 1246 return; 1247 } 1248 1249 /** 1250 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1251 * returning. 1252 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1253 * in case. 1254 * @param q The queue to take from. 1255 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1256 */ 1257 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1258 List<RAMQueueEntry> receptacle) throws InterruptedException { 1259 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1260 // ok even if list grew to accommodate thousands. 1261 receptacle.clear(); 1262 receptacle.add(q.take()); 1263 q.drainTo(receptacle); 1264 return receptacle; 1265 } 1266 1267 /** 1268 * @see #retrieveFromFile(int[]) 1269 */ 1270 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1271 justification = "false positive, try-with-resources ensures close is called.") 1272 void persistToFile() throws IOException { 1273 LOG.debug("Thread {} started persisting bucket cache to file", 1274 Thread.currentThread().getName()); 1275 if (!isCachePersistent()) { 1276 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1277 } 1278 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1279 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1280 fos.write(ProtobufMagic.PB_MAGIC); 1281 BucketProtoUtils.toPB(this).writeDelimitedTo(fos); 1282 } catch (IOException e) { 1283 LOG.error("Failed to persist bucket cache to file", e); 1284 throw e; 1285 } 1286 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1287 Thread.currentThread().getName()); 1288 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1289 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1290 + "RS crashes/restarts before we successfully checkpoint again."); 1291 } 1292 } 1293 1294 public boolean isCachePersistent() { 1295 return ioEngine.isPersistent() && persistencePath != null; 1296 } 1297 1298 public Map<String, Long> getRegionCachedInfo() { 1299 return Collections.unmodifiableMap(regionCachedSizeMap); 1300 } 1301 1302 /** 1303 * @see #persistToFile() 1304 */ 1305 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1306 LOG.info("Started retrieving bucket cache from file"); 1307 File persistenceFile = new File(persistencePath); 1308 if (!persistenceFile.exists()) { 1309 LOG.warn("Persistence file missing! " 1310 + "It's ok if it's first run after enabling persistent cache."); 1311 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1312 blockNumber.add(backingMap.size()); 1313 backingMapValidated.set(true); 1314 return; 1315 } 1316 assert !cacheEnabled; 1317 1318 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1319 int pblen = ProtobufMagic.lengthOfPBMagic(); 1320 byte[] pbuf = new byte[pblen]; 1321 IOUtils.readFully(in, pbuf, 0, pblen); 1322 if (!ProtobufMagic.isPBMagicPrefix(pbuf)) { 1323 // In 3.0 we have enough flexibility to dump the old cache data. 1324 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1325 throw new IOException( 1326 "Persistence file does not start with protobuf magic number. " + persistencePath); 1327 } 1328 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1329 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1330 blockNumber.add(backingMap.size()); 1331 LOG.info("Bucket cache retrieved from file successfully"); 1332 } 1333 } 1334 1335 private void updateRegionSizeMapWhileRetrievingFromFile() { 1336 // Update the regionCachedSizeMap with the region size while restarting the region server 1337 if (LOG.isDebugEnabled()) { 1338 LOG.debug("Updating region size map after retrieving cached file list"); 1339 dumpPrefetchList(); 1340 } 1341 regionCachedSizeMap.clear(); 1342 fullyCachedFiles.forEach((hFileName, hFileSize) -> { 1343 // Get the region name for each file 1344 String regionEncodedName = hFileSize.getFirst(); 1345 long cachedFileSize = hFileSize.getSecond(); 1346 regionCachedSizeMap.merge(regionEncodedName, cachedFileSize, 1347 (oldpf, fileSize) -> oldpf + fileSize); 1348 }); 1349 } 1350 1351 private void dumpPrefetchList() { 1352 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1353 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1354 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1355 } 1356 } 1357 1358 /** 1359 * Create an input stream that deletes the file after reading it. Use in try-with-resources to 1360 * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: 1361 * 1362 * <pre> 1363 * File f = ... 1364 * try (FileInputStream fis = new FileInputStream(f)) { 1365 * // use the input stream 1366 * } finally { 1367 * if (!f.delete()) throw new IOException("failed to delete"); 1368 * } 1369 * </pre> 1370 * 1371 * @param file the file to read and delete 1372 * @return a FileInputStream for the given file 1373 * @throws IOException if there is a problem creating the stream 1374 */ 1375 private FileInputStream deleteFileOnClose(final File file) throws IOException { 1376 return new FileInputStream(file) { 1377 private File myFile; 1378 1379 private FileInputStream init(File file) { 1380 myFile = file; 1381 return this; 1382 } 1383 1384 @Override 1385 public void close() throws IOException { 1386 // close() will be called during try-with-resources and it will be 1387 // called by finalizer thread during GC. To avoid double-free resource, 1388 // set myFile to null after the first call. 1389 if (myFile == null) { 1390 return; 1391 } 1392 1393 super.close(); 1394 if (!myFile.delete()) { 1395 throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath()); 1396 } 1397 myFile = null; 1398 } 1399 }.init(file); 1400 } 1401 1402 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1403 throws IOException { 1404 if (capacitySize != cacheCapacity) { 1405 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1406 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1407 } 1408 if (!ioEngine.getClass().getName().equals(ioclass)) { 1409 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1410 + ioEngine.getClass().getName()); 1411 } 1412 if (!backingMap.getClass().getName().equals(mapclass)) { 1413 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1414 + backingMap.getClass().getName()); 1415 } 1416 } 1417 1418 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1419 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1420 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1421 this::createRecycler); 1422 backingMap = pair.getFirst(); 1423 blocksByHFile = pair.getSecond(); 1424 fullyCachedFiles.clear(); 1425 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1426 if (proto.hasChecksum()) { 1427 try { 1428 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1429 algorithm); 1430 backingMapValidated.set(true); 1431 } catch (IOException e) { 1432 LOG.warn("Checksum for cache file failed. " 1433 + "We need to validate each cache key in the backing map. " 1434 + "This may take some time, so we'll do it in a background thread,"); 1435 Runnable cacheValidator = () -> { 1436 while (bucketAllocator == null) { 1437 try { 1438 Thread.sleep(50); 1439 } catch (InterruptedException ex) { 1440 throw new RuntimeException(ex); 1441 } 1442 } 1443 long startTime = EnvironmentEdgeManager.currentTime(); 1444 int totalKeysOriginally = backingMap.size(); 1445 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1446 try { 1447 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1448 } catch (IOException e1) { 1449 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1450 evictBlock(keyEntry.getKey()); 1451 removeFileFromPrefetch(keyEntry.getKey().getHfileName()); 1452 } 1453 } 1454 backingMapValidated.set(true); 1455 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1456 totalKeysOriginally, backingMap.size(), 1457 (EnvironmentEdgeManager.currentTime() - startTime)); 1458 }; 1459 Thread t = new Thread(cacheValidator); 1460 t.setDaemon(true); 1461 t.start(); 1462 } 1463 } else { 1464 // if has not checksum, it means the persistence file is old format 1465 LOG.info("Persistent file is old format, it does not support verifying file integrity!"); 1466 backingMapValidated.set(true); 1467 } 1468 updateRegionSizeMapWhileRetrievingFromFile(); 1469 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1470 } 1471 1472 /** 1473 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1474 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1475 */ 1476 private void checkIOErrorIsTolerated() { 1477 long now = EnvironmentEdgeManager.currentTime(); 1478 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1479 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1480 if (ioErrorStartTimeTmp > 0) { 1481 if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1482 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1483 + "ms, disabling cache, please check your IOEngine"); 1484 disableCache(); 1485 } 1486 } else { 1487 this.ioErrorStartTime = now; 1488 } 1489 } 1490 1491 /** 1492 * Used to shut down the cache -or- turn it off in the case of something broken. 1493 */ 1494 private void disableCache() { 1495 if (!cacheEnabled) return; 1496 LOG.info("Disabling cache"); 1497 cacheEnabled = false; 1498 ioEngine.shutdown(); 1499 this.scheduleThreadPool.shutdown(); 1500 for (int i = 0; i < writerThreads.length; ++i) 1501 writerThreads[i].interrupt(); 1502 this.ramCache.clear(); 1503 if (!ioEngine.isPersistent() || persistencePath == null) { 1504 // If persistent ioengine and a path, we will serialize out the backingMap. 1505 this.backingMap.clear(); 1506 this.blocksByHFile.clear(); 1507 this.fullyCachedFiles.clear(); 1508 this.regionCachedSizeMap.clear(); 1509 } 1510 } 1511 1512 private void join() throws InterruptedException { 1513 for (int i = 0; i < writerThreads.length; ++i) 1514 writerThreads[i].join(); 1515 } 1516 1517 @Override 1518 public void shutdown() { 1519 disableCache(); 1520 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1521 + persistencePath); 1522 if (ioEngine.isPersistent() && persistencePath != null) { 1523 try { 1524 join(); 1525 if (cachePersister != null) { 1526 LOG.info("Shutting down cache persister thread."); 1527 cachePersister.shutdown(); 1528 while (cachePersister.isAlive()) { 1529 Thread.sleep(10); 1530 } 1531 } 1532 persistToFile(); 1533 } catch (IOException ex) { 1534 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1535 } catch (InterruptedException e) { 1536 LOG.warn("Failed to persist data on exit", e); 1537 } 1538 } 1539 } 1540 1541 /** 1542 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1543 * on different UT methods. 1544 */ 1545 @Override 1546 protected void finalize() { 1547 if (cachePersister != null && !cachePersister.isInterrupted()) { 1548 cachePersister.interrupt(); 1549 } 1550 } 1551 1552 @Override 1553 public CacheStats getStats() { 1554 return cacheStats; 1555 } 1556 1557 public BucketAllocator getAllocator() { 1558 return this.bucketAllocator; 1559 } 1560 1561 @Override 1562 public long heapSize() { 1563 return this.heapSize.sum(); 1564 } 1565 1566 @Override 1567 public long size() { 1568 return this.realCacheSize.sum(); 1569 } 1570 1571 @Override 1572 public long getCurrentDataSize() { 1573 return size(); 1574 } 1575 1576 @Override 1577 public long getFreeSize() { 1578 return this.bucketAllocator.getFreeSize(); 1579 } 1580 1581 @Override 1582 public long getBlockCount() { 1583 return this.blockNumber.sum(); 1584 } 1585 1586 @Override 1587 public long getDataBlockCount() { 1588 return getBlockCount(); 1589 } 1590 1591 @Override 1592 public long getCurrentSize() { 1593 return this.bucketAllocator.getUsedSize(); 1594 } 1595 1596 protected String getAlgorithm() { 1597 return algorithm; 1598 } 1599 1600 /** 1601 * Evicts all blocks for a specific HFile. 1602 * <p> 1603 * This is used for evict-on-close to remove all blocks of a specific HFile. 1604 * @return the number of blocks evicted 1605 */ 1606 @Override 1607 public int evictBlocksByHfileName(String hfileName) { 1608 removeFileFromPrefetch(hfileName); 1609 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName); 1610 int numEvicted = 0; 1611 for (BlockCacheKey key : keySet) { 1612 if (evictBlock(key)) { 1613 ++numEvicted; 1614 } 1615 } 1616 return numEvicted; 1617 } 1618 1619 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName) { 1620 return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, 1621 new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1622 } 1623 1624 /** 1625 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1626 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1627 * number of elements out of each according to configuration parameters and their relative sizes. 1628 */ 1629 private class BucketEntryGroup { 1630 1631 private CachedEntryQueue queue; 1632 private long totalSize = 0; 1633 private long bucketSize; 1634 1635 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1636 this.bucketSize = bucketSize; 1637 queue = new CachedEntryQueue(bytesToFree, blockSize); 1638 totalSize = 0; 1639 } 1640 1641 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1642 totalSize += block.getValue().getLength(); 1643 queue.add(block); 1644 } 1645 1646 public long free(long toFree) { 1647 Map.Entry<BlockCacheKey, BucketEntry> entry; 1648 long freedBytes = 0; 1649 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1650 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1651 while ((entry = queue.pollLast()) != null) { 1652 BlockCacheKey blockCacheKey = entry.getKey(); 1653 BucketEntry be = entry.getValue(); 1654 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1655 freedBytes += be.getLength(); 1656 } 1657 if (freedBytes >= toFree) { 1658 return freedBytes; 1659 } 1660 } 1661 return freedBytes; 1662 } 1663 1664 public long overflow() { 1665 return totalSize - bucketSize; 1666 } 1667 1668 public long totalSize() { 1669 return totalSize; 1670 } 1671 } 1672 1673 /** 1674 * Block Entry stored in the memory with key,data and so on 1675 */ 1676 static class RAMQueueEntry { 1677 private final BlockCacheKey key; 1678 private final Cacheable data; 1679 private long accessCounter; 1680 private boolean inMemory; 1681 private boolean isCachePersistent; 1682 1683 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1684 boolean isCachePersistent) { 1685 this.key = bck; 1686 this.data = data; 1687 this.accessCounter = accessCounter; 1688 this.inMemory = inMemory; 1689 this.isCachePersistent = isCachePersistent; 1690 } 1691 1692 public Cacheable getData() { 1693 return data; 1694 } 1695 1696 public BlockCacheKey getKey() { 1697 return key; 1698 } 1699 1700 public void access(long accessCounter) { 1701 this.accessCounter = accessCounter; 1702 } 1703 1704 private ByteBuffAllocator getByteBuffAllocator() { 1705 if (data instanceof HFileBlock) { 1706 return ((HFileBlock) data).getByteBuffAllocator(); 1707 } 1708 return ByteBuffAllocator.HEAP; 1709 } 1710 1711 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1712 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1713 ByteBuffer metaBuff) throws IOException { 1714 int len = data.getSerializedLength(); 1715 // This cacheable thing can't be serialized 1716 if (len == 0) { 1717 return null; 1718 } 1719 if (isCachePersistent && data instanceof HFileBlock) { 1720 len += Long.BYTES; // we need to record the cache time for consistency check in case of 1721 // recovery 1722 } 1723 long offset = alloc.allocateBlock(len); 1724 boolean succ = false; 1725 BucketEntry bucketEntry = null; 1726 try { 1727 int diskSizeWithHeader = (data instanceof HFileBlock) 1728 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 1729 : data.getSerializedLength(); 1730 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 1731 createRecycler, getByteBuffAllocator()); 1732 bucketEntry.setDeserializerReference(data.getDeserializer()); 1733 if (data instanceof HFileBlock) { 1734 // If an instance of HFileBlock, save on some allocations. 1735 HFileBlock block = (HFileBlock) data; 1736 ByteBuff sliceBuf = block.getBufferReadOnly(); 1737 block.getMetaData(metaBuff); 1738 // adds the cache time prior to the block and metadata part 1739 if (isCachePersistent) { 1740 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 1741 buffer.putLong(bucketEntry.getCachedTime()); 1742 buffer.rewind(); 1743 ioEngine.write(buffer, offset); 1744 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 1745 } else { 1746 ioEngine.write(sliceBuf, offset); 1747 } 1748 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 1749 } else { 1750 // Only used for testing. 1751 ByteBuffer bb = ByteBuffer.allocate(len); 1752 data.serialize(bb, true); 1753 ioEngine.write(bb, offset); 1754 } 1755 succ = true; 1756 } finally { 1757 if (!succ) { 1758 alloc.freeBlock(offset, len); 1759 } 1760 } 1761 realCacheSize.add(len); 1762 return bucketEntry; 1763 } 1764 } 1765 1766 /** 1767 * Only used in test 1768 */ 1769 void stopWriterThreads() throws InterruptedException { 1770 for (WriterThread writerThread : writerThreads) { 1771 writerThread.disableWriter(); 1772 writerThread.interrupt(); 1773 writerThread.join(); 1774 } 1775 } 1776 1777 @Override 1778 public Iterator<CachedBlock> iterator() { 1779 // Don't bother with ramcache since stuff is in here only a little while. 1780 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 1781 return new Iterator<CachedBlock>() { 1782 private final long now = System.nanoTime(); 1783 1784 @Override 1785 public boolean hasNext() { 1786 return i.hasNext(); 1787 } 1788 1789 @Override 1790 public CachedBlock next() { 1791 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1792 return new CachedBlock() { 1793 @Override 1794 public String toString() { 1795 return BlockCacheUtil.toString(this, now); 1796 } 1797 1798 @Override 1799 public BlockPriority getBlockPriority() { 1800 return e.getValue().getPriority(); 1801 } 1802 1803 @Override 1804 public BlockType getBlockType() { 1805 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1806 return null; 1807 } 1808 1809 @Override 1810 public long getOffset() { 1811 return e.getKey().getOffset(); 1812 } 1813 1814 @Override 1815 public long getSize() { 1816 return e.getValue().getLength(); 1817 } 1818 1819 @Override 1820 public long getCachedTime() { 1821 return e.getValue().getCachedTime(); 1822 } 1823 1824 @Override 1825 public String getFilename() { 1826 return e.getKey().getHfileName(); 1827 } 1828 1829 @Override 1830 public int compareTo(CachedBlock other) { 1831 int diff = this.getFilename().compareTo(other.getFilename()); 1832 if (diff != 0) return diff; 1833 1834 diff = Long.compare(this.getOffset(), other.getOffset()); 1835 if (diff != 0) return diff; 1836 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1837 throw new IllegalStateException( 1838 "" + this.getCachedTime() + ", " + other.getCachedTime()); 1839 } 1840 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1841 } 1842 1843 @Override 1844 public int hashCode() { 1845 return e.getKey().hashCode(); 1846 } 1847 1848 @Override 1849 public boolean equals(Object obj) { 1850 if (obj instanceof CachedBlock) { 1851 CachedBlock cb = (CachedBlock) obj; 1852 return compareTo(cb) == 0; 1853 } else { 1854 return false; 1855 } 1856 } 1857 }; 1858 } 1859 1860 @Override 1861 public void remove() { 1862 throw new UnsupportedOperationException(); 1863 } 1864 }; 1865 } 1866 1867 @Override 1868 public BlockCache[] getBlockCaches() { 1869 return null; 1870 } 1871 1872 public int getRpcRefCount(BlockCacheKey cacheKey) { 1873 BucketEntry bucketEntry = backingMap.get(cacheKey); 1874 if (bucketEntry != null) { 1875 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 1876 } 1877 return 0; 1878 } 1879 1880 float getAcceptableFactor() { 1881 return acceptableFactor; 1882 } 1883 1884 float getMinFactor() { 1885 return minFactor; 1886 } 1887 1888 float getExtraFreeFactor() { 1889 return extraFreeFactor; 1890 } 1891 1892 float getSingleFactor() { 1893 return singleFactor; 1894 } 1895 1896 float getMultiFactor() { 1897 return multiFactor; 1898 } 1899 1900 float getMemoryFactor() { 1901 return memoryFactor; 1902 } 1903 1904 public String getPersistencePath() { 1905 return persistencePath; 1906 } 1907 1908 /** 1909 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 1910 */ 1911 static class RAMCache { 1912 /** 1913 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 1914 * {@link RAMCache#get(BlockCacheKey)} and 1915 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 1916 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 1917 * func method can execute exactly once only when the key is present(or absent) and under the 1918 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 1919 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 1920 */ 1921 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 1922 1923 public boolean containsKey(BlockCacheKey key) { 1924 return delegate.containsKey(key); 1925 } 1926 1927 public RAMQueueEntry get(BlockCacheKey key) { 1928 return delegate.computeIfPresent(key, (k, re) -> { 1929 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 1930 // atomic, another thread may remove and release the block, when retaining in this thread we 1931 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 1932 re.getData().retain(); 1933 return re; 1934 }); 1935 } 1936 1937 /** 1938 * Return the previous associated value, or null if absent. It has the same meaning as 1939 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 1940 */ 1941 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 1942 AtomicBoolean absent = new AtomicBoolean(false); 1943 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 1944 // The RAMCache reference to this entry, so reference count should be increment. 1945 entry.getData().retain(); 1946 absent.set(true); 1947 return entry; 1948 }); 1949 return absent.get() ? null : re; 1950 } 1951 1952 public boolean remove(BlockCacheKey key) { 1953 return remove(key, re -> { 1954 }); 1955 } 1956 1957 /** 1958 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 1959 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 1960 * exception. the consumer will access entry to remove before release its reference count. 1961 * Notice, don't change its reference count in the {@link Consumer} 1962 */ 1963 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 1964 RAMQueueEntry previous = delegate.remove(key); 1965 action.accept(previous); 1966 if (previous != null) { 1967 previous.getData().release(); 1968 } 1969 return previous != null; 1970 } 1971 1972 public boolean isEmpty() { 1973 return delegate.isEmpty(); 1974 } 1975 1976 public void clear() { 1977 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 1978 while (it.hasNext()) { 1979 RAMQueueEntry re = it.next().getValue(); 1980 it.remove(); 1981 re.getData().release(); 1982 } 1983 } 1984 1985 public boolean hasBlocksForFile(String fileName) { 1986 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 1987 .findFirst().isPresent(); 1988 } 1989 } 1990 1991 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 1992 return backingMap; 1993 } 1994 1995 public AtomicBoolean getBackingMapValidated() { 1996 return backingMapValidated; 1997 } 1998 1999 @Override 2000 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2001 return Optional.of(fullyCachedFiles); 2002 } 2003 2004 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2005 if (cacheConf.getBlockCache().isPresent()) { 2006 BlockCache bc = cacheConf.getBlockCache().get(); 2007 if (bc instanceof CombinedBlockCache) { 2008 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2009 if (l2 instanceof BucketCache) { 2010 return Optional.of((BucketCache) l2); 2011 } 2012 } else if (bc instanceof BucketCache) { 2013 return Optional.of((BucketCache) bc); 2014 } 2015 } 2016 return Optional.empty(); 2017 } 2018 2019 @Override 2020 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2021 long size) { 2022 // block eviction may be happening in the background as prefetch runs, 2023 // so we need to count all blocks for this file in the backing map under 2024 // a read lock for the block offset 2025 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2026 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2027 fileName, totalBlockCount, dataBlockCount); 2028 try { 2029 final MutableInt count = new MutableInt(); 2030 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2031 backingMap.entrySet().stream().forEach(entry -> { 2032 if ( 2033 entry.getKey().getHfileName().equals(fileName.getName()) 2034 && entry.getKey().getBlockType().equals(BlockType.DATA) 2035 ) { 2036 long offsetToLock = entry.getValue().offset(); 2037 LOG.debug("found block {} in the backing map. Acquiring read lock for offset {}", 2038 entry.getKey(), offsetToLock); 2039 ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock); 2040 lock.readLock().lock(); 2041 locks.add(lock); 2042 // rechecks the given key is still there (no eviction happened before the lock acquired) 2043 if (backingMap.containsKey(entry.getKey())) { 2044 count.increment(); 2045 } else { 2046 lock.readLock().unlock(); 2047 locks.remove(lock); 2048 LOG.debug("found block {}, but when locked and tried to count, it was gone."); 2049 } 2050 } 2051 }); 2052 int metaCount = totalBlockCount - dataBlockCount; 2053 // BucketCache would only have data blocks 2054 if (dataBlockCount == count.getValue()) { 2055 LOG.debug("File {} has now been fully cached.", fileName); 2056 fileCacheCompleted(fileName, size); 2057 } else { 2058 LOG.debug( 2059 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2060 + "Total data blocks for file: {}. " 2061 + "Checking for blocks pending cache in cache writer queue.", 2062 fileName, count.getValue(), dataBlockCount); 2063 if (ramCache.hasBlocksForFile(fileName.getName())) { 2064 for (ReentrantReadWriteLock lock : locks) { 2065 lock.readLock().unlock(); 2066 } 2067 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2068 + "and try the verification again.", fileName.getName()); 2069 Thread.sleep(100); 2070 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2071 } else 2072 if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) == dataBlockCount) { 2073 LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " 2074 + "the cache write queue but we now found that total cached blocks for file {} " 2075 + "is equal to data block count.", count, dataBlockCount, fileName.getName()); 2076 fileCacheCompleted(fileName, size); 2077 } else { 2078 LOG.info("We found only {} data blocks cached from a total of {} for file {}, " 2079 + "but no blocks pending caching. Maybe cache is full or evictions " 2080 + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName); 2081 } 2082 } 2083 } catch (InterruptedException e) { 2084 throw new RuntimeException(e); 2085 } finally { 2086 for (ReentrantReadWriteLock lock : locks) { 2087 lock.readLock().unlock(); 2088 } 2089 } 2090 } 2091 2092 @Override 2093 public void notifyFileBlockEvicted(String fileName) { 2094 removeFileFromPrefetch(fileName); 2095 } 2096 2097 @Override 2098 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2099 long currentUsed = bucketAllocator.getUsedSize(); 2100 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2101 return Optional.of(result); 2102 } 2103 2104 @Override 2105 public Optional<Boolean> shouldCacheFile(String fileName) { 2106 // if we don't have the file in fullyCachedFiles, we should cache it 2107 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2108 } 2109 2110 @Override 2111 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2112 return Optional.of(getBackingMap().containsKey(key)); 2113 } 2114 2115 @Override 2116 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2117 BucketEntry entry = backingMap.get(key); 2118 if (entry == null) { 2119 return Optional.empty(); 2120 } else { 2121 return Optional.of(entry.getOnDiskSizeWithHeader()); 2122 } 2123 2124 } 2125 2126 private void removeFileFromPrefetch(String hfileName) { 2127 // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted 2128 if (fullyCachedFiles.containsKey(hfileName)) { 2129 Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName); 2130 String regionEncodedName = regionEntry.getFirst(); 2131 long filePrefetchSize = regionEntry.getSecond(); 2132 LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); 2133 regionCachedSizeMap.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); 2134 // If all the blocks for a region are evicted from the cache, remove the entry for that region 2135 if ( 2136 regionCachedSizeMap.containsKey(regionEncodedName) 2137 && regionCachedSizeMap.get(regionEncodedName) == 0 2138 ) { 2139 regionCachedSizeMap.remove(regionEncodedName); 2140 } 2141 } 2142 fullyCachedFiles.remove(hfileName); 2143 } 2144 2145 public void fileCacheCompleted(Path filePath, long size) { 2146 Pair<String, Long> pair = new Pair<>(); 2147 // sets the region name 2148 String regionName = filePath.getParent().getParent().getName(); 2149 pair.setFirst(regionName); 2150 pair.setSecond(size); 2151 fullyCachedFiles.put(filePath.getName(), pair); 2152 regionCachedSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize); 2153 } 2154 2155}