001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.NavigableSet; 036import java.util.Optional; 037import java.util.PriorityQueue; 038import java.util.Set; 039import java.util.concurrent.ArrayBlockingQueue; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.ConcurrentSkipListSet; 044import java.util.concurrent.Executors; 045import java.util.concurrent.ScheduledExecutorService; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.Lock; 051import java.util.concurrent.locks.ReentrantLock; 052import java.util.concurrent.locks.ReentrantReadWriteLock; 053import java.util.function.Consumer; 054import java.util.function.Function; 055import org.apache.commons.io.IOUtils; 056import org.apache.commons.lang3.mutable.MutableInt; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HBaseIOException; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator; 064import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 065import org.apache.hadoop.hbase.io.HeapSize; 066import org.apache.hadoop.hbase.io.hfile.BlockCache; 067import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 068import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 069import org.apache.hadoop.hbase.io.hfile.BlockPriority; 070import org.apache.hadoop.hbase.io.hfile.BlockType; 071import org.apache.hadoop.hbase.io.hfile.CacheConfig; 072import org.apache.hadoop.hbase.io.hfile.CacheStats; 073import org.apache.hadoop.hbase.io.hfile.Cacheable; 074import org.apache.hadoop.hbase.io.hfile.CachedBlock; 075import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 076import org.apache.hadoop.hbase.io.hfile.HFileBlock; 077import org.apache.hadoop.hbase.io.hfile.HFileContext; 078import org.apache.hadoop.hbase.nio.ByteBuff; 079import org.apache.hadoop.hbase.nio.RefCnt; 080import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 081import org.apache.hadoop.hbase.regionserver.HRegion; 082import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.IdReadWriteLock; 086import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 087import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 088import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 089import org.apache.hadoop.hbase.util.Pair; 090import org.apache.hadoop.util.StringUtils; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 096import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 097 098import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 099 100/** 101 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 102 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 103 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 104 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 105 * {@link FileIOEngine} to store/read the block data. 106 * <p> 107 * Eviction is via a similar algorithm as used in 108 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 109 * <p> 110 * BucketCache can be used as mainly a block cache (see 111 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 112 * decrease CMS GC and heap fragmentation. 113 * <p> 114 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 115 * enlarge cache space via a victim cache. 116 */ 117@InterfaceAudience.Private 118public class BucketCache implements BlockCache, HeapSize { 119 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 120 121 /** Priority buckets config */ 122 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 123 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 124 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 125 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 126 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 127 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 128 static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 129 "hbase.bucketcache.persistence.chunksize"; 130 131 /** Use strong reference for offsetLock or not */ 132 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 133 private static final boolean STRONG_REF_DEFAULT = false; 134 135 /** The cache age of blocks to check if the related file is present on any online regions. */ 136 static final String BLOCK_ORPHAN_GRACE_PERIOD = 137 "hbase.bucketcache.block.orphan.evictgraceperiod.seconds"; 138 139 static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L; 140 141 /** Priority buckets */ 142 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 143 static final float DEFAULT_MULTI_FACTOR = 0.50f; 144 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 145 static final float DEFAULT_MIN_FACTOR = 0.85f; 146 147 static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 148 static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 149 150 // Number of blocks to clear for each of the bucket size that is full 151 static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 152 153 /** Statistics thread */ 154 private static final int statThreadPeriod = 5 * 60; 155 156 final static int DEFAULT_WRITER_THREADS = 3; 157 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 158 159 final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000; 160 161 // Store/read block data 162 transient final IOEngine ioEngine; 163 164 // Store the block in this map before writing it to cache 165 transient final RAMCache ramCache; 166 167 // In this map, store the block's meta data like offset, length 168 transient Map<BlockCacheKey, BucketEntry> backingMap; 169 170 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 171 172 /** 173 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 174 * together with the region those belong to and the total cached size for the 175 * region.TestBlockEvictionOnRegionMovement 176 */ 177 transient final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 178 /** 179 * Map of region -> total size of the region prefetched on this region server. This is the total 180 * size of hFiles for this region prefetched on this region server 181 */ 182 final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>(); 183 184 private transient BucketCachePersister cachePersister; 185 186 /** 187 * Enum to represent the state of cache 188 */ 189 protected enum CacheState { 190 // Initializing: State when the cache is being initialised from persistence. 191 INITIALIZING, 192 // Enabled: State when cache is initialised and is ready. 193 ENABLED, 194 // Disabled: State when the cache is disabled. 195 DISABLED 196 } 197 198 /** 199 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 200 * that Bucket IO exceptions/errors don't bring down the HBase server. 201 */ 202 private volatile CacheState cacheState; 203 204 /** 205 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 206 * words, the work adding blocks to the BucketCache is divided up amongst the running 207 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 208 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 209 * then updates the ramCache and backingMap accordingly. 210 */ 211 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 212 transient final WriterThread[] writerThreads; 213 214 /** Volatile boolean to track if free space is in process or not */ 215 private volatile boolean freeInProgress = false; 216 private transient final Lock freeSpaceLock = new ReentrantLock(); 217 218 private final LongAdder realCacheSize = new LongAdder(); 219 private final LongAdder heapSize = new LongAdder(); 220 /** Current number of cached elements */ 221 private final LongAdder blockNumber = new LongAdder(); 222 223 /** Cache access count (sequential ID) */ 224 private final AtomicLong accessCount = new AtomicLong(); 225 226 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 227 228 private final BucketCacheStats cacheStats = new BucketCacheStats(); 229 private final String persistencePath; 230 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 231 private final long cacheCapacity; 232 /** Approximate block size */ 233 private final long blockSize; 234 235 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 236 private final int ioErrorsTolerationDuration; 237 // 1 min 238 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 239 240 // Start time of first IO error when reading or writing IO Engine, it will be 241 // reset after a successful read/write. 242 private volatile long ioErrorStartTime = -1; 243 244 private transient Configuration conf; 245 246 /** 247 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 248 * this is to avoid freeing the block which is being read. 249 * <p> 250 */ 251 transient final IdReadWriteLock<Long> offsetLock; 252 253 transient NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 254 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 255 256 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 257 private transient final ScheduledExecutorService scheduleThreadPool = 258 Executors.newScheduledThreadPool(1, 259 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 260 261 // Allocate or free space for the block 262 private transient BucketAllocator bucketAllocator; 263 264 /** Acceptable size of cache (no evictions if size < acceptable) */ 265 private float acceptableFactor; 266 267 /** Minimum threshold of cache (when evicting, evict until size < min) */ 268 private float minFactor; 269 270 /** 271 * Free this floating point factor of extra blocks when evicting. For example free the number of 272 * blocks requested * (1 + extraFreeFactor) 273 */ 274 private float extraFreeFactor; 275 276 /** Single access bucket size */ 277 private float singleFactor; 278 279 /** Multiple access bucket size */ 280 private float multiFactor; 281 282 /** In-memory bucket size */ 283 private float memoryFactor; 284 285 private long bucketcachePersistInterval; 286 287 private static final String FILE_VERIFY_ALGORITHM = 288 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 289 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 290 291 public static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime"; 292 static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 293 private long queueAdditionWaitTime; 294 /** 295 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 296 * integrity, default algorithm is MD5 297 */ 298 private String algorithm; 299 300 private long persistenceChunkSize; 301 302 /* Tracing failed Bucket Cache allocations. */ 303 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 304 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 305 306 private transient Map<String, HRegion> onlineRegions; 307 308 private long orphanBlockGracePeriod = 0; 309 310 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 311 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 312 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 313 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 314 } 315 316 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 317 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 318 Configuration conf) throws IOException { 319 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 320 persistencePath, ioErrorsTolerationDuration, conf, null); 321 } 322 323 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 324 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 325 Configuration conf, Map<String, HRegion> onlineRegions) throws IOException { 326 Preconditions.checkArgument(blockSize > 0, 327 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 328 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 329 if (useStrongRef) { 330 this.offsetLock = new IdReadWriteLockStrongRef<>(); 331 } else { 332 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 333 } 334 this.conf = conf; 335 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 336 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 337 this.writerThreads = new WriterThread[writerThreadNum]; 338 this.onlineRegions = onlineRegions; 339 this.orphanBlockGracePeriod = 340 conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT); 341 long blockNumCapacity = capacity / blockSize; 342 if (blockNumCapacity >= Integer.MAX_VALUE) { 343 // Enough for about 32TB of cache! 344 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 345 } 346 347 // these sets the dynamic configs 348 this.onConfigurationChange(conf); 349 350 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 351 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 352 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 353 + ", useStrongRef: " + useStrongRef); 354 355 this.cacheCapacity = capacity; 356 this.persistencePath = persistencePath; 357 this.blockSize = blockSize; 358 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 359 this.cacheState = CacheState.INITIALIZING; 360 361 this.allocFailLogPrevTs = 0; 362 363 for (int i = 0; i < writerThreads.length; ++i) { 364 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 365 } 366 367 assert writerQueues.size() == writerThreads.length; 368 this.ramCache = new RAMCache(); 369 370 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 371 instantiateWriterThreads(); 372 373 if (isCachePersistent()) { 374 if (ioEngine instanceof FileIOEngine) { 375 startBucketCachePersisterThread(); 376 } 377 startPersistenceRetriever(bucketSizes, capacity); 378 } else { 379 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 380 this.cacheState = CacheState.ENABLED; 381 startWriterThreads(); 382 } 383 384 // Run the statistics thread periodically to print the cache statistics log 385 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 386 // every five minutes. 387 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 388 statThreadPeriod, TimeUnit.SECONDS); 389 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 390 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 391 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 392 + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); 393 } 394 395 private void startPersistenceRetriever(int[] bucketSizes, long capacity) { 396 Runnable persistentCacheRetriever = () -> { 397 try { 398 retrieveFromFile(bucketSizes); 399 LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); 400 } catch (Throwable ex) { 401 LOG.warn("Can't restore from file[{}]. The bucket cache will be reset and rebuilt." 402 + " Exception seen: ", persistencePath, ex); 403 backingMap.clear(); 404 fullyCachedFiles.clear(); 405 backingMapValidated.set(true); 406 regionCachedSize.clear(); 407 try { 408 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 409 } catch (BucketAllocatorException allocatorException) { 410 LOG.error("Exception during Bucket Allocation", allocatorException); 411 } 412 } finally { 413 this.cacheState = CacheState.ENABLED; 414 startWriterThreads(); 415 } 416 }; 417 Thread t = new Thread(persistentCacheRetriever); 418 t.start(); 419 } 420 421 private void sanityCheckConfigs() { 422 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 423 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 424 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 425 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 426 Preconditions.checkArgument(minFactor <= acceptableFactor, 427 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 428 Preconditions.checkArgument(extraFreeFactor >= 0, 429 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 430 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 431 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 432 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 433 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 434 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 435 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 436 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 437 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 438 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 439 if (this.persistenceChunkSize <= 0) { 440 persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 441 } 442 } 443 444 /** 445 * Called by the constructor to instantiate the writer threads. 446 */ 447 private void instantiateWriterThreads() { 448 final String threadName = Thread.currentThread().getName(); 449 for (int i = 0; i < this.writerThreads.length; ++i) { 450 this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); 451 this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 452 this.writerThreads[i].setDaemon(true); 453 } 454 } 455 456 /** 457 * Called by the constructor to start the writer threads. Used by tests that need to override 458 * starting the threads. 459 */ 460 protected void startWriterThreads() { 461 for (WriterThread thread : writerThreads) { 462 thread.start(); 463 } 464 } 465 466 void startBucketCachePersisterThread() { 467 LOG.info("Starting BucketCachePersisterThread"); 468 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 469 cachePersister.setDaemon(true); 470 cachePersister.start(); 471 } 472 473 @Override 474 public boolean isCacheEnabled() { 475 return this.cacheState == CacheState.ENABLED; 476 } 477 478 @Override 479 public long getMaxSize() { 480 return this.cacheCapacity; 481 } 482 483 public String getIoEngine() { 484 return ioEngine.toString(); 485 } 486 487 /** 488 * Get the IOEngine from the IO engine name 489 * @return the IOEngine 490 */ 491 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 492 throws IOException { 493 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 494 // In order to make the usage simple, we only need the prefix 'files:' in 495 // document whether one or multiple file(s), but also support 'file:' for 496 // the compatibility 497 String[] filePaths = 498 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 499 return new FileIOEngine(capacity, persistencePath != null, filePaths); 500 } else if (ioEngineName.startsWith("offheap")) { 501 return new ByteBufferIOEngine(capacity); 502 } else if (ioEngineName.startsWith("mmap:")) { 503 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 504 } else if (ioEngineName.startsWith("pmem:")) { 505 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 506 // device. Since the persistent memory device has its own address space the contents 507 // mapped to this address space does not get swapped out like in the case of mmapping 508 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 509 // can be directly referred to without having to copy them onheap. Once the RPC is done, 510 // the blocks can be returned back as in case of ByteBufferIOEngine. 511 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 512 } else { 513 throw new IllegalArgumentException( 514 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 515 } 516 } 517 518 public boolean isCachePersistenceEnabled() { 519 return persistencePath != null; 520 } 521 522 /** 523 * Cache the block with the specified name and buffer. 524 * @param cacheKey block's cache key 525 * @param buf block buffer 526 */ 527 @Override 528 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 529 cacheBlock(cacheKey, buf, false); 530 } 531 532 /** 533 * Cache the block with the specified name and buffer. 534 * @param cacheKey block's cache key 535 * @param cachedItem block buffer 536 * @param inMemory if block is in-memory 537 */ 538 @Override 539 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 540 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 541 } 542 543 /** 544 * Cache the block with the specified name and buffer. 545 * @param cacheKey block's cache key 546 * @param cachedItem block buffer 547 * @param inMemory if block is in-memory 548 */ 549 @Override 550 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 551 boolean waitWhenCache) { 552 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 553 } 554 555 /** 556 * Cache the block to ramCache 557 * @param cacheKey block's cache key 558 * @param cachedItem block buffer 559 * @param inMemory if block is in-memory 560 * @param wait if true, blocking wait when queue is full 561 */ 562 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 563 boolean wait) { 564 if (isCacheEnabled()) { 565 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 566 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 567 BucketEntry bucketEntry = backingMap.get(cacheKey); 568 if (bucketEntry != null && bucketEntry.isRpcRef()) { 569 // avoid replace when there are RPC refs for the bucket entry in bucket cache 570 return; 571 } 572 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 573 } 574 } else { 575 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 576 } 577 } 578 } 579 580 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 581 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 582 } 583 584 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 585 boolean inMemory, boolean wait) { 586 if (!isCacheEnabled()) { 587 return; 588 } 589 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 590 cacheKey.setBlockType(cachedItem.getBlockType()); 591 } 592 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 593 // Stuff the entry into the RAM cache so it can get drained to the persistent store 594 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 595 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine); 596 /** 597 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 598 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 599 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 600 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 601 * one, then the heap size will mess up (HBASE-20789) 602 */ 603 if (ramCache.putIfAbsent(cacheKey, re) != null) { 604 return; 605 } 606 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 607 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 608 boolean successfulAddition = false; 609 if (wait) { 610 try { 611 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 612 } catch (InterruptedException e) { 613 LOG.error("Thread interrupted: ", e); 614 Thread.currentThread().interrupt(); 615 } 616 } else { 617 successfulAddition = bq.offer(re); 618 } 619 if (!successfulAddition) { 620 LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey); 621 ramCache.remove(cacheKey); 622 cacheStats.failInsert(); 623 } else { 624 this.blockNumber.increment(); 625 this.heapSize.add(cachedItem.heapSize()); 626 } 627 } 628 629 /** 630 * If the passed cache key relates to a reference (<hfile>.<parentEncRegion>), this 631 * method looks for the block from the referred file, in the cache. If present in the cache, the 632 * block for the referred file is returned, otherwise, this method returns null. It will also 633 * return null if the passed cache key doesn't relate to a reference. 634 * @param key the BlockCacheKey instance to look for in the cache. 635 * @return the cached block from the referred file, null if there's no such block in the cache or 636 * the passed key doesn't relate to a reference. 637 */ 638 public BucketEntry getBlockForReference(BlockCacheKey key) { 639 BucketEntry foundEntry = null; 640 String referredFileName = null; 641 if (StoreFileInfo.isReference(key.getHfileName())) { 642 referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond(); 643 } 644 if (referredFileName != null) { 645 BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); 646 foundEntry = backingMap.get(convertedCacheKey); 647 LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), 648 convertedCacheKey, foundEntry); 649 } 650 return foundEntry; 651 } 652 653 /** 654 * Get the buffer of the block with the specified key. 655 * @param key block's cache key 656 * @param caching true if the caller caches blocks on cache misses 657 * @param repeat Whether this is a repeat lookup for the same block 658 * @param updateCacheMetrics Whether we should update cache metrics or not 659 * @return buffer of specified cache key, or null if not in cache 660 */ 661 @Override 662 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 663 boolean updateCacheMetrics) { 664 if (!isCacheEnabled()) { 665 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 666 return null; 667 } 668 RAMQueueEntry re = ramCache.get(key); 669 if (re != null) { 670 if (updateCacheMetrics) { 671 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 672 } 673 re.access(accessCount.incrementAndGet()); 674 return re.getData(); 675 } 676 BucketEntry bucketEntry = backingMap.get(key); 677 if (bucketEntry == null) { 678 bucketEntry = getBlockForReference(key); 679 } 680 if (bucketEntry != null) { 681 long start = System.nanoTime(); 682 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 683 try { 684 lock.readLock().lock(); 685 // We can not read here even if backingMap does contain the given key because its offset 686 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 687 // existence here. 688 if ( 689 bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key)) 690 ) { 691 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 692 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 693 // the same BucketEntry, then all of the three will share the same refCnt. 694 Cacheable cachedBlock = ioEngine.read(bucketEntry); 695 if (ioEngine.usesSharedMemory()) { 696 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 697 // same RefCnt, do retain here, in order to count the number of RPC references 698 cachedBlock.retain(); 699 } 700 // Update the cache statistics. 701 if (updateCacheMetrics) { 702 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 703 cacheStats.ioHit(System.nanoTime() - start); 704 } 705 bucketEntry.access(accessCount.incrementAndGet()); 706 if (this.ioErrorStartTime > 0) { 707 ioErrorStartTime = -1; 708 } 709 return cachedBlock; 710 } 711 } catch (HBaseIOException hioex) { 712 // When using file io engine persistent cache, 713 // the cache map state might differ from the actual cache. If we reach this block, 714 // we should remove the cache key entry from the backing map 715 backingMap.remove(key); 716 fullyCachedFiles.remove(key.getHfileName()); 717 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 718 } catch (IOException ioex) { 719 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 720 checkIOErrorIsTolerated(); 721 } finally { 722 lock.readLock().unlock(); 723 } 724 } 725 if (!repeat && updateCacheMetrics) { 726 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 727 } 728 return null; 729 } 730 731 /** 732 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 733 */ 734 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 735 boolean evictedByEvictionProcess) { 736 bucketEntry.markAsEvicted(); 737 blocksByHFile.remove(cacheKey); 738 if (decrementBlockNumber) { 739 this.blockNumber.decrement(); 740 if (ioEngine.isPersistent()) { 741 fileNotFullyCached(cacheKey.getHfileName()); 742 } 743 } 744 if (evictedByEvictionProcess) { 745 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 746 } 747 if (ioEngine.isPersistent()) { 748 setCacheInconsistent(true); 749 } 750 } 751 752 private void fileNotFullyCached(String hfileName) { 753 // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted 754 if (fullyCachedFiles.containsKey(hfileName)) { 755 Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName); 756 String regionEncodedName = regionEntry.getFirst(); 757 long filePrefetchSize = regionEntry.getSecond(); 758 LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); 759 regionCachedSize.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); 760 // If all the blocks for a region are evicted from the cache, remove the entry for that region 761 if ( 762 regionCachedSize.containsKey(regionEncodedName) 763 && regionCachedSize.get(regionEncodedName) == 0 764 ) { 765 regionCachedSize.remove(regionEncodedName); 766 } 767 } 768 fullyCachedFiles.remove(hfileName); 769 } 770 771 public void fileCacheCompleted(Path filePath, long size) { 772 Pair<String, Long> pair = new Pair<>(); 773 // sets the region name 774 String regionName = filePath.getParent().getParent().getName(); 775 pair.setFirst(regionName); 776 pair.setSecond(size); 777 fullyCachedFiles.put(filePath.getName(), pair); 778 } 779 780 private void updateRegionCachedSize(Path filePath, long cachedSize) { 781 if (filePath != null) { 782 String regionName = filePath.getParent().getParent().getName(); 783 regionCachedSize.merge(regionName, cachedSize, 784 (previousSize, newBlockSize) -> previousSize + newBlockSize); 785 } 786 } 787 788 /** 789 * Free the {{@link BucketEntry} actually,which could only be invoked when the 790 * {@link BucketEntry#refCnt} becoming 0. 791 */ 792 void freeBucketEntry(BucketEntry bucketEntry) { 793 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 794 realCacheSize.add(-1 * bucketEntry.getLength()); 795 } 796 797 /** 798 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 799 * 1. Close an HFile, and clear all cached blocks. <br> 800 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 801 * <p> 802 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 803 * Here we evict the block from backingMap immediately, but only free the reference from bucket 804 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 805 * block, block can only be de-allocated when all of them release the block. 806 * <p> 807 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 808 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 809 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 810 * @param cacheKey Block to evict 811 * @return true to indicate whether we've evicted successfully or not. 812 */ 813 @Override 814 public boolean evictBlock(BlockCacheKey cacheKey) { 815 return doEvictBlock(cacheKey, null, false); 816 } 817 818 /** 819 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 820 * {@link BucketCache#ramCache}. <br/> 821 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 822 * {@link BucketEntry} could be removed. 823 * @param cacheKey {@link BlockCacheKey} to evict. 824 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 825 * @return true to indicate whether we've evicted successfully or not. 826 */ 827 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 828 boolean evictedByEvictionProcess) { 829 if (!isCacheEnabled()) { 830 return false; 831 } 832 boolean existedInRamCache = removeFromRamCache(cacheKey); 833 if (bucketEntry == null) { 834 bucketEntry = backingMap.get(cacheKey); 835 } 836 final BucketEntry bucketEntryToUse = bucketEntry; 837 838 if (bucketEntryToUse == null) { 839 if (existedInRamCache && evictedByEvictionProcess) { 840 cacheStats.evicted(0, cacheKey.isPrimary()); 841 } 842 return existedInRamCache; 843 } else { 844 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 845 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 846 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 847 cacheKey, bucketEntryToUse.offset()); 848 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 849 return true; 850 } 851 return false; 852 }); 853 } 854 } 855 856 /** 857 * <pre> 858 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 859 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 860 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 861 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 862 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 863 * it is the return value of current {@link BucketCache#createRecycler} method. 864 * 865 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 866 * it is {@link ByteBuffAllocator#putbackBuffer}. 867 * </pre> 868 */ 869 public Recycler createRecycler(final BucketEntry bucketEntry) { 870 return () -> { 871 freeBucketEntry(bucketEntry); 872 return; 873 }; 874 } 875 876 /** 877 * NOTE: This method is only for test. 878 */ 879 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 880 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 881 if (bucketEntry == null) { 882 return false; 883 } 884 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 885 } 886 887 /** 888 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 889 * {@link BucketEntry#isRpcRef} is false. <br/> 890 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 891 * {@link BucketEntry} could be removed. 892 * @param blockCacheKey {@link BlockCacheKey} to evict. 893 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 894 * @return true to indicate whether we've evicted successfully or not. 895 */ 896 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 897 if (!bucketEntry.isRpcRef()) { 898 return doEvictBlock(blockCacheKey, bucketEntry, true); 899 } 900 return false; 901 } 902 903 /** 904 * Since HBASE-29249, the following properties governin freeSpace behaviour and block priorities 905 * were made dynamically configurable: - hbase.bucketcache.acceptfactor - 906 * hbase.bucketcache.minfactor - hbase.bucketcache.extrafreefactor - 907 * hbase.bucketcache.single.factor - hbase.bucketcache.multi.factor - 908 * hbase.bucketcache.multi.factor - hbase.bucketcache.memory.factor The 909 * hbase.bucketcache.queue.addition.waittime property allows for introducing a delay in the 910 * publishing of blocks for the cache writer threads during prefetch reads only (client reads 911 * wouldn't get delayed). It has also been made dynamic configurable since HBASE-29249. The 912 * hbase.bucketcache.persist.intervalinmillis propperty determines the frequency for saving the 913 * persistent cache, and it has also been made dynamically configurable since HBASE-29249. The 914 * hbase.bucketcache.persistence.chunksize property determines the size of the persistent file 915 * splits (due to the limitation of maximum allowed protobuff size), and it has also been made 916 * dynamically configurable since HBASE-29249. 917 * @param config the new configuration to be updated. 918 */ 919 @Override 920 public void onConfigurationChange(Configuration config) { 921 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 922 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 923 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 924 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 925 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 926 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 927 this.queueAdditionWaitTime = 928 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 929 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 930 this.persistenceChunkSize = 931 conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); 932 sanityCheckConfigs(); 933 } 934 935 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 936 return ramCache.remove(cacheKey, re -> { 937 if (re != null) { 938 this.blockNumber.decrement(); 939 this.heapSize.add(-1 * re.getData().heapSize()); 940 } 941 }); 942 } 943 944 public boolean isCacheInconsistent() { 945 return isCacheInconsistent.get(); 946 } 947 948 public void setCacheInconsistent(boolean setCacheInconsistent) { 949 isCacheInconsistent.set(setCacheInconsistent); 950 } 951 952 protected void setCacheState(CacheState state) { 953 cacheState = state; 954 } 955 956 /* 957 * Statistics thread. Periodically output cache statistics to the log. 958 */ 959 private static class StatisticsThread extends Thread { 960 private final BucketCache bucketCache; 961 962 public StatisticsThread(BucketCache bucketCache) { 963 super("BucketCacheStatsThread"); 964 setDaemon(true); 965 this.bucketCache = bucketCache; 966 } 967 968 @Override 969 public void run() { 970 bucketCache.logStats(); 971 } 972 } 973 974 public void logStats() { 975 if (!isCacheInitialized("BucketCache::logStats")) { 976 return; 977 } 978 979 long totalSize = bucketAllocator.getTotalSize(); 980 long usedSize = bucketAllocator.getUsedSize(); 981 long freeSize = totalSize - usedSize; 982 long cacheSize = getRealCacheSize(); 983 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 984 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 985 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 986 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 987 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 988 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 989 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 990 + (cacheStats.getHitCount() == 0 991 ? "0," 992 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 993 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 994 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 995 + (cacheStats.getHitCachingCount() == 0 996 ? "0," 997 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 998 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 999 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 1000 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount=" 1001 + backingMap.size()); 1002 cacheStats.reset(); 1003 1004 bucketAllocator.logDebugStatistics(); 1005 } 1006 1007 public long getRealCacheSize() { 1008 return this.realCacheSize.sum(); 1009 } 1010 1011 public long acceptableSize() { 1012 if (!isCacheInitialized("BucketCache::acceptableSize")) { 1013 return 0; 1014 } 1015 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 1016 } 1017 1018 long getPartitionSize(float partitionFactor) { 1019 if (!isCacheInitialized("BucketCache::getPartitionSize")) { 1020 return 0; 1021 } 1022 1023 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 1024 } 1025 1026 /** 1027 * Return the count of bucketSizeinfos still need free space 1028 */ 1029 private int bucketSizesAboveThresholdCount(float minFactor) { 1030 if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { 1031 return 0; 1032 } 1033 1034 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1035 int fullCount = 0; 1036 for (int i = 0; i < stats.length; i++) { 1037 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1038 freeGoal = Math.max(freeGoal, 1); 1039 if (stats[i].freeCount() < freeGoal) { 1040 fullCount++; 1041 } 1042 } 1043 return fullCount; 1044 } 1045 1046 /** 1047 * This method will find the buckets that are minimally occupied and are not reference counted and 1048 * will free them completely without any constraint on the access times of the elements, and as a 1049 * process will completely free at most the number of buckets passed, sometimes it might not due 1050 * to changing refCounts 1051 * @param completelyFreeBucketsNeeded number of buckets to free 1052 **/ 1053 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 1054 if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { 1055 return; 1056 } 1057 1058 if (completelyFreeBucketsNeeded != 0) { 1059 // First we will build a set where the offsets are reference counted, usually 1060 // this set is small around O(Handler Count) unless something else is wrong 1061 Set<Integer> inUseBuckets = new HashSet<>(); 1062 backingMap.forEach((k, be) -> { 1063 if (be.isRpcRef()) { 1064 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 1065 } 1066 }); 1067 Set<Integer> candidateBuckets = 1068 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 1069 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 1070 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 1071 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 1072 } 1073 } 1074 } 1075 } 1076 1077 private long calculateBytesToFree(StringBuilder msgBuffer) { 1078 long bytesToFreeWithoutExtra = 0; 1079 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1080 long[] bytesToFreeForBucket = new long[stats.length]; 1081 for (int i = 0; i < stats.length; i++) { 1082 bytesToFreeForBucket[i] = 0; 1083 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1084 freeGoal = Math.max(freeGoal, 1); 1085 if (stats[i].freeCount() < freeGoal) { 1086 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 1087 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 1088 if (msgBuffer != null) { 1089 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 1090 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 1091 } 1092 } 1093 } 1094 if (msgBuffer != null) { 1095 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 1096 } 1097 return bytesToFreeWithoutExtra; 1098 } 1099 1100 /** 1101 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 1102 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 1103 * blocks evicted 1104 * @param why Why we are being called 1105 */ 1106 void freeSpace(final String why) { 1107 if (!isCacheInitialized("BucketCache::freeSpace")) { 1108 return; 1109 } 1110 // Ensure only one freeSpace progress at a time 1111 if (!freeSpaceLock.tryLock()) { 1112 return; 1113 } 1114 try { 1115 freeInProgress = true; 1116 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 1117 long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer); 1118 if (bytesToFreeWithoutExtra <= 0) { 1119 return; 1120 } 1121 long currentSize = bucketAllocator.getUsedSize(); 1122 long totalSize = bucketAllocator.getTotalSize(); 1123 if (LOG.isDebugEnabled() && msgBuffer != null) { 1124 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used=" 1125 + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 1126 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 1127 + StringUtils.byteDesc(totalSize)); 1128 } 1129 long bytesToFreeWithExtra = 1130 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 1131 // Instantiate priority buckets 1132 BucketEntryGroup bucketSingle = 1133 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 1134 BucketEntryGroup bucketMulti = 1135 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 1136 BucketEntryGroup bucketMemory = 1137 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 1138 1139 Set<String> allValidFiles = null; 1140 // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not. 1141 // See further comments below for more details. 1142 if (onlineRegions != null) { 1143 allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions); 1144 } 1145 // the cached time is recored in nanos, so we need to convert the grace period accordingly 1146 long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000; 1147 long bytesFreed = 0; 1148 // Scan entire map putting bucket entry into appropriate bucket entry 1149 // group 1150 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 1151 BlockCacheKey key = bucketEntryWithKey.getKey(); 1152 BucketEntry entry = bucketEntryWithKey.getValue(); 1153 // Under certain conditions, blocks for regions not on the current region server might 1154 // be hanging on the cache. For example, when using the persistent cache feature, if the 1155 // RS crashes, then if not the same regions are assigned back once its online again, blocks 1156 // for the previous online regions would be recovered and stay in the cache. These would be 1157 // "orphan" blocks, as the files these blocks belong to are not in any of the online 1158 // regions. 1159 // "Orphan" blocks are a pure waste of cache space and should be evicted first during 1160 // the freespace run. 1161 // Compactions and Flushes may cache blocks before its files are completely written. In 1162 // these cases the file won't be found in any of the online regions stores, but the block 1163 // shouldn't be evicted. To avoid this, we defined this 1164 // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace 1165 // period (default 24 hours) where a block should be checked if it's an orphan block. 1166 if ( 1167 allValidFiles != null 1168 && entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos) 1169 ) { 1170 if (!allValidFiles.contains(key.getHfileName())) { 1171 if (evictBucketEntryIfNoRpcReferenced(key, entry)) { 1172 // We calculate the freed bytes, but we don't stop if the goal was reached because 1173 // these are orphan blocks anyway, so let's leverage this run of freeSpace 1174 // to get rid of all orphans at once. 1175 bytesFreed += entry.getLength(); 1176 continue; 1177 } 1178 } 1179 } 1180 switch (entry.getPriority()) { 1181 case SINGLE: { 1182 bucketSingle.add(bucketEntryWithKey); 1183 break; 1184 } 1185 case MULTI: { 1186 bucketMulti.add(bucketEntryWithKey); 1187 break; 1188 } 1189 case MEMORY: { 1190 bucketMemory.add(bucketEntryWithKey); 1191 break; 1192 } 1193 } 1194 } 1195 PriorityQueue<BucketEntryGroup> bucketQueue = 1196 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 1197 1198 bucketQueue.add(bucketSingle); 1199 bucketQueue.add(bucketMulti); 1200 bucketQueue.add(bucketMemory); 1201 1202 int remainingBuckets = bucketQueue.size(); 1203 1204 BucketEntryGroup bucketGroup; 1205 while ((bucketGroup = bucketQueue.poll()) != null) { 1206 long overflow = bucketGroup.overflow(); 1207 if (overflow > 0) { 1208 long bucketBytesToFree = 1209 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 1210 bytesFreed += bucketGroup.free(bucketBytesToFree); 1211 } 1212 remainingBuckets--; 1213 } 1214 1215 // Check and free if there are buckets that still need freeing of space 1216 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 1217 bucketQueue.clear(); 1218 remainingBuckets = 3; 1219 bucketQueue.add(bucketSingle); 1220 bucketQueue.add(bucketMulti); 1221 bucketQueue.add(bucketMemory); 1222 while ((bucketGroup = bucketQueue.poll()) != null) { 1223 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 1224 bytesFreed += bucketGroup.free(bucketBytesToFree); 1225 remainingBuckets--; 1226 } 1227 } 1228 // Even after the above free we might still need freeing because of the 1229 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 1230 // there might be some buckets where the occupancy is very sparse and thus are not 1231 // yielding the free for the other bucket sizes, the fix for this to evict some 1232 // of the buckets, we do this by evicting the buckets that are least fulled 1233 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 1234 1235 if (LOG.isDebugEnabled()) { 1236 long single = bucketSingle.totalSize(); 1237 long multi = bucketMulti.totalSize(); 1238 long memory = bucketMemory.totalSize(); 1239 if (LOG.isDebugEnabled()) { 1240 LOG.debug("Bucket cache free space completed; " + "freed=" 1241 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1242 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1243 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1244 } 1245 } 1246 } catch (Throwable t) { 1247 LOG.warn("Failed freeing space", t); 1248 } finally { 1249 cacheStats.evict(); 1250 freeInProgress = false; 1251 freeSpaceLock.unlock(); 1252 } 1253 } 1254 1255 // This handles flushing the RAM cache to IOEngine. 1256 class WriterThread extends Thread { 1257 private final BlockingQueue<RAMQueueEntry> inputQueue; 1258 private volatile boolean writerEnabled = true; 1259 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1260 1261 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1262 super("BucketCacheWriterThread"); 1263 this.inputQueue = queue; 1264 } 1265 1266 // Used for test 1267 void disableWriter() { 1268 this.writerEnabled = false; 1269 } 1270 1271 @Override 1272 public void run() { 1273 List<RAMQueueEntry> entries = new ArrayList<>(); 1274 try { 1275 while (isCacheEnabled() && writerEnabled) { 1276 try { 1277 try { 1278 // Blocks 1279 entries = getRAMQueueEntries(inputQueue, entries); 1280 } catch (InterruptedException ie) { 1281 if (!isCacheEnabled() || !writerEnabled) { 1282 break; 1283 } 1284 } 1285 doDrain(entries, metaBuff); 1286 } catch (Exception ioe) { 1287 LOG.error("WriterThread encountered error", ioe); 1288 } 1289 } 1290 } catch (Throwable t) { 1291 LOG.warn("Failed doing drain", t); 1292 } 1293 LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); 1294 } 1295 } 1296 1297 /** 1298 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1299 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1300 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1301 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1302 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1303 * bucketAllocator do not free its memory. 1304 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1305 * cacheKey, Cacheable newBlock) 1306 * @param key Block cache key 1307 * @param bucketEntry Bucket entry to put into backingMap. 1308 */ 1309 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1310 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1311 blocksByHFile.add(key); 1312 updateRegionCachedSize(key.getFilePath(), bucketEntry.getLength()); 1313 if (previousEntry != null && previousEntry != bucketEntry) { 1314 previousEntry.withWriteLock(offsetLock, () -> { 1315 blockEvicted(key, previousEntry, false, false); 1316 return null; 1317 }); 1318 } 1319 } 1320 1321 /** 1322 * Prepare and return a warning message for Bucket Allocator Exception 1323 * @param fle The exception 1324 * @param re The RAMQueueEntry for which the exception was thrown. 1325 * @return A warning message created from the input RAMQueueEntry object. 1326 */ 1327 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1328 final RAMQueueEntry re) { 1329 final StringBuilder sb = new StringBuilder(); 1330 sb.append("Most recent failed allocation after "); 1331 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1332 sb.append(" ms;"); 1333 if (re != null) { 1334 if (re.getData() instanceof HFileBlock) { 1335 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1336 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1337 final String tableName = Bytes.toString(fileContext.getTableName()); 1338 if (tableName != null) { 1339 sb.append(" Table: "); 1340 sb.append(tableName); 1341 } 1342 if (columnFamily != null) { 1343 sb.append(" CF: "); 1344 sb.append(columnFamily); 1345 } 1346 sb.append(" HFile: "); 1347 if (fileContext.getHFileName() != null) { 1348 sb.append(fileContext.getHFileName()); 1349 } else { 1350 sb.append(re.getKey()); 1351 } 1352 } else { 1353 sb.append(" HFile: "); 1354 sb.append(re.getKey()); 1355 } 1356 } 1357 sb.append(" Message: "); 1358 sb.append(fle.getMessage()); 1359 return sb.toString(); 1360 } 1361 1362 /** 1363 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1364 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1365 * references and we'll OOME. 1366 * @param entries Presumes list passed in here will be processed by this invocation only. No 1367 * interference expected. 1368 */ 1369 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1370 if (entries.isEmpty()) { 1371 return; 1372 } 1373 // This method is a little hard to follow. We run through the passed in entries and for each 1374 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1375 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1376 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1377 // filling ramCache. We do the clean up by again running through the passed in entries 1378 // doing extra work when we find a non-null bucketEntries corresponding entry. 1379 final int size = entries.size(); 1380 BucketEntry[] bucketEntries = new BucketEntry[size]; 1381 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1382 // when we go to add an entry by going around the loop again without upping the index. 1383 int index = 0; 1384 while (isCacheEnabled() && index < size) { 1385 RAMQueueEntry re = null; 1386 try { 1387 re = entries.get(index); 1388 if (re == null) { 1389 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1390 index++; 1391 continue; 1392 } 1393 // Reset the position for reuse. 1394 // It should be guaranteed that the data in the metaBuff has been transferred to the 1395 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1396 // transferred with our current IOEngines. Should take care, when we have new kinds of 1397 // IOEngine in the future. 1398 metaBuff.clear(); 1399 BucketEntry bucketEntry = 1400 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1401 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1402 bucketEntries[index] = bucketEntry; 1403 if (ioErrorStartTime > 0) { 1404 ioErrorStartTime = -1; 1405 } 1406 index++; 1407 } catch (BucketAllocatorException fle) { 1408 long currTs = EnvironmentEdgeManager.currentTime(); 1409 cacheStats.allocationFailed(); // Record the warning. 1410 if ( 1411 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1412 ) { 1413 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1414 allocFailLogPrevTs = currTs; 1415 } 1416 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1417 bucketEntries[index] = null; 1418 index++; 1419 } catch (CacheFullException cfe) { 1420 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1421 if (!freeInProgress) { 1422 freeSpace("Full!"); 1423 } else { 1424 Thread.sleep(50); 1425 } 1426 } catch (IOException ioex) { 1427 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1428 LOG.error("Failed writing to bucket cache", ioex); 1429 checkIOErrorIsTolerated(); 1430 } 1431 } 1432 1433 // Make sure data pages are written on media before we update maps. 1434 try { 1435 ioEngine.sync(); 1436 } catch (IOException ioex) { 1437 LOG.error("Failed syncing IO engine", ioex); 1438 checkIOErrorIsTolerated(); 1439 // Since we failed sync, free the blocks in bucket allocator 1440 for (int i = 0; i < entries.size(); ++i) { 1441 BucketEntry bucketEntry = bucketEntries[i]; 1442 if (bucketEntry != null) { 1443 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1444 bucketEntries[i] = null; 1445 } 1446 } 1447 } 1448 1449 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1450 // success or error. 1451 for (int i = 0; i < size; ++i) { 1452 BlockCacheKey key = entries.get(i).getKey(); 1453 // Only add if non-null entry. 1454 if (bucketEntries[i] != null) { 1455 putIntoBackingMap(key, bucketEntries[i]); 1456 if (ioEngine.isPersistent()) { 1457 setCacheInconsistent(true); 1458 } 1459 } 1460 // Always remove from ramCache even if we failed adding it to the block cache above. 1461 boolean existed = ramCache.remove(key, re -> { 1462 if (re != null) { 1463 heapSize.add(-1 * re.getData().heapSize()); 1464 } 1465 }); 1466 if (!existed && bucketEntries[i] != null) { 1467 // Block should have already been evicted. Remove it and free space. 1468 final BucketEntry bucketEntry = bucketEntries[i]; 1469 bucketEntry.withWriteLock(offsetLock, () -> { 1470 if (backingMap.remove(key, bucketEntry)) { 1471 blockEvicted(key, bucketEntry, false, false); 1472 } 1473 return null; 1474 }); 1475 } 1476 } 1477 1478 long used = bucketAllocator.getUsedSize(); 1479 if (used > acceptableSize()) { 1480 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1481 } 1482 return; 1483 } 1484 1485 /** 1486 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1487 * returning. 1488 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1489 * in case. 1490 * @param q The queue to take from. 1491 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1492 */ 1493 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1494 List<RAMQueueEntry> receptacle) throws InterruptedException { 1495 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1496 // ok even if list grew to accommodate thousands. 1497 receptacle.clear(); 1498 receptacle.add(q.take()); 1499 q.drainTo(receptacle); 1500 return receptacle; 1501 } 1502 1503 /** 1504 * @see #retrieveFromFile(int[]) 1505 */ 1506 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1507 justification = "false positive, try-with-resources ensures close is called.") 1508 void persistToFile() throws IOException { 1509 LOG.debug("Thread {} started persisting bucket cache to file", 1510 Thread.currentThread().getName()); 1511 if (!isCachePersistent()) { 1512 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1513 } 1514 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1515 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1516 LOG.debug("Persist in new chunked persistence format."); 1517 1518 persistChunkedBackingMap(fos); 1519 1520 LOG.debug( 1521 "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," 1522 + " file name: {}", 1523 backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); 1524 } catch (IOException e) { 1525 LOG.error("Failed to persist bucket cache to file", e); 1526 throw e; 1527 } catch (Throwable e) { 1528 LOG.error("Failed during persist bucket cache to file: ", e); 1529 throw e; 1530 } 1531 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1532 Thread.currentThread().getName()); 1533 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1534 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1535 + "RS crashes/restarts before we successfully checkpoint again."); 1536 } 1537 } 1538 1539 public boolean isCachePersistent() { 1540 return ioEngine.isPersistent() && persistencePath != null; 1541 } 1542 1543 @Override 1544 public Optional<Map<String, Long>> getRegionCachedInfo() { 1545 return Optional.of(Collections.unmodifiableMap(regionCachedSize)); 1546 } 1547 1548 /** 1549 * @see #persistToFile() 1550 */ 1551 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1552 LOG.info("Started retrieving bucket cache from file"); 1553 File persistenceFile = new File(persistencePath); 1554 if (!persistenceFile.exists()) { 1555 LOG.warn("Persistence file missing! " 1556 + "It's ok if it's first run after enabling persistent cache."); 1557 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1558 blockNumber.add(backingMap.size()); 1559 backingMapValidated.set(true); 1560 return; 1561 } 1562 assert !isCacheEnabled(); 1563 1564 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1565 int pblen = ProtobufMagic.lengthOfPBMagic(); 1566 byte[] pbuf = new byte[pblen]; 1567 IOUtils.readFully(in, pbuf, 0, pblen); 1568 if (ProtobufMagic.isPBMagicPrefix(pbuf)) { 1569 LOG.info("Reading old format of persistence."); 1570 // The old non-chunked version of backing map persistence. 1571 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1572 } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { 1573 // The new persistence format of chunked persistence. 1574 LOG.info("Reading new chunked format of persistence."); 1575 retrieveChunkedBackingMap(in); 1576 } else { 1577 // In 3.0 we have enough flexibility to dump the old cache data. 1578 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1579 throw new IOException( 1580 "Persistence file does not start with protobuf magic number. " + persistencePath); 1581 } 1582 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1583 blockNumber.add(backingMap.size()); 1584 LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); 1585 } 1586 } 1587 1588 private void updateRegionSizeMapWhileRetrievingFromFile() { 1589 // Update the regionCachedSize with the region size while restarting the region server 1590 if (LOG.isDebugEnabled()) { 1591 LOG.debug("Updating region size map after retrieving cached file list"); 1592 dumpPrefetchList(); 1593 } 1594 regionCachedSize.clear(); 1595 fullyCachedFiles.forEach((hFileName, hFileSize) -> { 1596 // Get the region name for each file 1597 String regionEncodedName = hFileSize.getFirst(); 1598 long cachedFileSize = hFileSize.getSecond(); 1599 regionCachedSize.merge(regionEncodedName, cachedFileSize, 1600 (oldpf, fileSize) -> oldpf + fileSize); 1601 }); 1602 } 1603 1604 private void dumpPrefetchList() { 1605 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1606 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1607 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1608 } 1609 } 1610 1611 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1612 throws IOException { 1613 if (capacitySize != cacheCapacity) { 1614 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1615 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1616 } 1617 if (!ioEngine.getClass().getName().equals(ioclass)) { 1618 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1619 + ioEngine.getClass().getName()); 1620 } 1621 if (!backingMap.getClass().getName().equals(mapclass)) { 1622 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1623 + backingMap.getClass().getName()); 1624 } 1625 } 1626 1627 private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { 1628 try { 1629 if (proto.hasChecksum()) { 1630 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1631 algorithm); 1632 } 1633 backingMapValidated.set(true); 1634 } catch (IOException e) { 1635 LOG.warn("Checksum for cache file failed. " 1636 + "We need to validate each cache key in the backing map. " 1637 + "This may take some time, so we'll do it in a background thread,"); 1638 1639 Runnable cacheValidator = () -> { 1640 while (bucketAllocator == null) { 1641 try { 1642 Thread.sleep(50); 1643 } catch (InterruptedException ex) { 1644 throw new RuntimeException(ex); 1645 } 1646 } 1647 long startTime = EnvironmentEdgeManager.currentTime(); 1648 int totalKeysOriginally = backingMap.size(); 1649 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1650 try { 1651 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1652 } catch (IOException e1) { 1653 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1654 evictBlock(keyEntry.getKey()); 1655 fileNotFullyCached(keyEntry.getKey().getHfileName()); 1656 } 1657 } 1658 backingMapValidated.set(true); 1659 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1660 totalKeysOriginally, backingMap.size(), 1661 (EnvironmentEdgeManager.currentTime() - startTime)); 1662 }; 1663 Thread t = new Thread(cacheValidator); 1664 t.setDaemon(true); 1665 t.start(); 1666 } 1667 } 1668 1669 private void updateCacheIndex(BucketCacheProtos.BackingMap chunk, 1670 java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException { 1671 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 = 1672 BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler); 1673 backingMap.putAll(pair2.getFirst()); 1674 blocksByHFile.addAll(pair2.getSecond()); 1675 } 1676 1677 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1678 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1679 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1680 this::createRecycler); 1681 backingMap = pair.getFirst(); 1682 blocksByHFile = pair.getSecond(); 1683 fullyCachedFiles.clear(); 1684 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1685 1686 LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), 1687 fullyCachedFiles.size()); 1688 1689 verifyFileIntegrity(proto); 1690 updateRegionSizeMapWhileRetrievingFromFile(); 1691 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1692 } 1693 1694 private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { 1695 LOG.debug( 1696 "persistToFile: before persisting backing map size: {}, " 1697 + "fullycachedFiles size: {}, chunkSize: {}", 1698 backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize); 1699 1700 BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize); 1701 1702 LOG.debug( 1703 "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}", 1704 backingMap.size(), fullyCachedFiles.size()); 1705 } 1706 1707 private void retrieveChunkedBackingMap(FileInputStream in) throws IOException { 1708 1709 // Read the first chunk that has all the details. 1710 BucketCacheProtos.BucketCacheEntry cacheEntry = 1711 BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); 1712 1713 fullyCachedFiles.clear(); 1714 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(cacheEntry.getCachedFilesMap())); 1715 1716 backingMap.clear(); 1717 blocksByHFile.clear(); 1718 1719 // Read the backing map entries in batches. 1720 int numChunks = 0; 1721 while (in.available() > 0) { 1722 updateCacheIndex(BucketCacheProtos.BackingMap.parseDelimitedFrom(in), 1723 cacheEntry.getDeserializersMap()); 1724 numChunks++; 1725 } 1726 1727 LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size()); 1728 verifyFileIntegrity(cacheEntry); 1729 verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(), 1730 cacheEntry.getMapClass()); 1731 updateRegionSizeMapWhileRetrievingFromFile(); 1732 } 1733 1734 /** 1735 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1736 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1737 */ 1738 private void checkIOErrorIsTolerated() { 1739 long now = EnvironmentEdgeManager.currentTime(); 1740 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1741 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1742 if (ioErrorStartTimeTmp > 0) { 1743 if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1744 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1745 + "ms, disabling cache, please check your IOEngine"); 1746 disableCache(); 1747 } 1748 } else { 1749 this.ioErrorStartTime = now; 1750 } 1751 } 1752 1753 /** 1754 * Used to shut down the cache -or- turn it off in the case of something broken. 1755 */ 1756 private void disableCache() { 1757 if (!isCacheEnabled()) { 1758 return; 1759 } 1760 LOG.info("Disabling cache"); 1761 cacheState = CacheState.DISABLED; 1762 ioEngine.shutdown(); 1763 this.scheduleThreadPool.shutdown(); 1764 for (int i = 0; i < writerThreads.length; ++i) 1765 writerThreads[i].interrupt(); 1766 this.ramCache.clear(); 1767 if (!ioEngine.isPersistent() || persistencePath == null) { 1768 // If persistent ioengine and a path, we will serialize out the backingMap. 1769 this.backingMap.clear(); 1770 this.blocksByHFile.clear(); 1771 this.fullyCachedFiles.clear(); 1772 this.regionCachedSize.clear(); 1773 } 1774 } 1775 1776 private void join() throws InterruptedException { 1777 for (int i = 0; i < writerThreads.length; ++i) 1778 writerThreads[i].join(); 1779 } 1780 1781 @Override 1782 public void shutdown() { 1783 disableCache(); 1784 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1785 + persistencePath); 1786 if (ioEngine.isPersistent() && persistencePath != null) { 1787 try { 1788 join(); 1789 if (cachePersister != null) { 1790 LOG.info("Shutting down cache persister thread."); 1791 cachePersister.shutdown(); 1792 while (cachePersister.isAlive()) { 1793 Thread.sleep(10); 1794 } 1795 } 1796 persistToFile(); 1797 } catch (IOException ex) { 1798 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1799 } catch (InterruptedException e) { 1800 LOG.warn("Failed to persist data on exit", e); 1801 } 1802 } 1803 } 1804 1805 /** 1806 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1807 * on different UT methods. 1808 */ 1809 @Override 1810 protected void finalize() { 1811 if (cachePersister != null && !cachePersister.isInterrupted()) { 1812 cachePersister.interrupt(); 1813 } 1814 } 1815 1816 @Override 1817 public CacheStats getStats() { 1818 return cacheStats; 1819 } 1820 1821 public BucketAllocator getAllocator() { 1822 return this.bucketAllocator; 1823 } 1824 1825 @Override 1826 public long heapSize() { 1827 return this.heapSize.sum(); 1828 } 1829 1830 @Override 1831 public long size() { 1832 return this.realCacheSize.sum(); 1833 } 1834 1835 @Override 1836 public long getCurrentDataSize() { 1837 return size(); 1838 } 1839 1840 @Override 1841 public long getFreeSize() { 1842 if (!isCacheInitialized("BucketCache:getFreeSize")) { 1843 return 0; 1844 } 1845 return this.bucketAllocator.getFreeSize(); 1846 } 1847 1848 @Override 1849 public long getBlockCount() { 1850 return this.blockNumber.sum(); 1851 } 1852 1853 @Override 1854 public long getDataBlockCount() { 1855 return getBlockCount(); 1856 } 1857 1858 @Override 1859 public long getCurrentSize() { 1860 if (!isCacheInitialized("BucketCache::getCurrentSize")) { 1861 return 0; 1862 } 1863 return this.bucketAllocator.getUsedSize(); 1864 } 1865 1866 protected String getAlgorithm() { 1867 return algorithm; 1868 } 1869 1870 /** 1871 * Evicts all blocks for a specific HFile. 1872 * <p> 1873 * This is used for evict-on-close to remove all blocks of a specific HFile. 1874 * @return the number of blocks evicted 1875 */ 1876 @Override 1877 public int evictBlocksByHfileName(String hfileName) { 1878 return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); 1879 } 1880 1881 @Override 1882 public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { 1883 fileNotFullyCached(hfileName); 1884 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); 1885 LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), 1886 hfileName, initOffset, endOffset); 1887 int numEvicted = 0; 1888 for (BlockCacheKey key : keySet) { 1889 if (evictBlock(key)) { 1890 ++numEvicted; 1891 } 1892 } 1893 return numEvicted; 1894 } 1895 1896 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) { 1897 return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, 1898 new BlockCacheKey(hfileName, end), true); 1899 } 1900 1901 /** 1902 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1903 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1904 * number of elements out of each according to configuration parameters and their relative sizes. 1905 */ 1906 private class BucketEntryGroup { 1907 1908 private CachedEntryQueue queue; 1909 private long totalSize = 0; 1910 private long bucketSize; 1911 1912 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1913 this.bucketSize = bucketSize; 1914 queue = new CachedEntryQueue(bytesToFree, blockSize); 1915 totalSize = 0; 1916 } 1917 1918 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1919 totalSize += block.getValue().getLength(); 1920 queue.add(block); 1921 } 1922 1923 public long free(long toFree) { 1924 Map.Entry<BlockCacheKey, BucketEntry> entry; 1925 long freedBytes = 0; 1926 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1927 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1928 while ((entry = queue.pollLast()) != null) { 1929 BlockCacheKey blockCacheKey = entry.getKey(); 1930 BucketEntry be = entry.getValue(); 1931 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1932 freedBytes += be.getLength(); 1933 } 1934 if (freedBytes >= toFree) { 1935 return freedBytes; 1936 } 1937 } 1938 return freedBytes; 1939 } 1940 1941 public long overflow() { 1942 return totalSize - bucketSize; 1943 } 1944 1945 public long totalSize() { 1946 return totalSize; 1947 } 1948 } 1949 1950 /** 1951 * Block Entry stored in the memory with key,data and so on 1952 */ 1953 static class RAMQueueEntry { 1954 private final BlockCacheKey key; 1955 private final Cacheable data; 1956 private long accessCounter; 1957 private boolean inMemory; 1958 private boolean isCachePersistent; 1959 1960 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1961 boolean isCachePersistent) { 1962 this.key = bck; 1963 this.data = data; 1964 this.accessCounter = accessCounter; 1965 this.inMemory = inMemory; 1966 this.isCachePersistent = isCachePersistent; 1967 } 1968 1969 public Cacheable getData() { 1970 return data; 1971 } 1972 1973 public BlockCacheKey getKey() { 1974 return key; 1975 } 1976 1977 public void access(long accessCounter) { 1978 this.accessCounter = accessCounter; 1979 } 1980 1981 private ByteBuffAllocator getByteBuffAllocator() { 1982 if (data instanceof HFileBlock) { 1983 return ((HFileBlock) data).getByteBuffAllocator(); 1984 } 1985 return ByteBuffAllocator.HEAP; 1986 } 1987 1988 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1989 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1990 ByteBuffer metaBuff) throws IOException { 1991 int len = data.getSerializedLength(); 1992 // This cacheable thing can't be serialized 1993 if (len == 0) { 1994 return null; 1995 } 1996 if (isCachePersistent && data instanceof HFileBlock) { 1997 len += Long.BYTES; // we need to record the cache time for consistency check in case of 1998 // recovery 1999 } 2000 long offset = alloc.allocateBlock(len); 2001 boolean succ = false; 2002 BucketEntry bucketEntry = null; 2003 try { 2004 int diskSizeWithHeader = (data instanceof HFileBlock) 2005 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 2006 : data.getSerializedLength(); 2007 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 2008 createRecycler, getByteBuffAllocator()); 2009 bucketEntry.setDeserializerReference(data.getDeserializer()); 2010 if (data instanceof HFileBlock) { 2011 // If an instance of HFileBlock, save on some allocations. 2012 HFileBlock block = (HFileBlock) data; 2013 ByteBuff sliceBuf = block.getBufferReadOnly(); 2014 block.getMetaData(metaBuff); 2015 // adds the cache time prior to the block and metadata part 2016 if (isCachePersistent) { 2017 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 2018 buffer.putLong(bucketEntry.getCachedTime()); 2019 buffer.rewind(); 2020 ioEngine.write(buffer, offset); 2021 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 2022 } else { 2023 ioEngine.write(sliceBuf, offset); 2024 } 2025 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 2026 } else { 2027 // Only used for testing. 2028 ByteBuffer bb = ByteBuffer.allocate(len); 2029 data.serialize(bb, true); 2030 ioEngine.write(bb, offset); 2031 } 2032 succ = true; 2033 } finally { 2034 if (!succ) { 2035 alloc.freeBlock(offset, len); 2036 } 2037 } 2038 realCacheSize.add(len); 2039 return bucketEntry; 2040 } 2041 } 2042 2043 /** 2044 * Only used in test 2045 */ 2046 void stopWriterThreads() throws InterruptedException { 2047 for (WriterThread writerThread : writerThreads) { 2048 writerThread.disableWriter(); 2049 writerThread.interrupt(); 2050 writerThread.join(); 2051 } 2052 } 2053 2054 @Override 2055 public Iterator<CachedBlock> iterator() { 2056 // Don't bother with ramcache since stuff is in here only a little while. 2057 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 2058 return new Iterator<CachedBlock>() { 2059 private final long now = System.nanoTime(); 2060 2061 @Override 2062 public boolean hasNext() { 2063 return i.hasNext(); 2064 } 2065 2066 @Override 2067 public CachedBlock next() { 2068 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 2069 return new CachedBlock() { 2070 @Override 2071 public String toString() { 2072 return BlockCacheUtil.toString(this, now); 2073 } 2074 2075 @Override 2076 public BlockPriority getBlockPriority() { 2077 return e.getValue().getPriority(); 2078 } 2079 2080 @Override 2081 public BlockType getBlockType() { 2082 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 2083 return null; 2084 } 2085 2086 @Override 2087 public long getOffset() { 2088 return e.getKey().getOffset(); 2089 } 2090 2091 @Override 2092 public long getSize() { 2093 return e.getValue().getLength(); 2094 } 2095 2096 @Override 2097 public long getCachedTime() { 2098 return e.getValue().getCachedTime(); 2099 } 2100 2101 @Override 2102 public String getFilename() { 2103 return e.getKey().getHfileName(); 2104 } 2105 2106 @Override 2107 public int compareTo(CachedBlock other) { 2108 int diff = this.getFilename().compareTo(other.getFilename()); 2109 if (diff != 0) return diff; 2110 2111 diff = Long.compare(this.getOffset(), other.getOffset()); 2112 if (diff != 0) return diff; 2113 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 2114 throw new IllegalStateException( 2115 "" + this.getCachedTime() + ", " + other.getCachedTime()); 2116 } 2117 return Long.compare(other.getCachedTime(), this.getCachedTime()); 2118 } 2119 2120 @Override 2121 public int hashCode() { 2122 return e.getKey().hashCode(); 2123 } 2124 2125 @Override 2126 public boolean equals(Object obj) { 2127 if (obj instanceof CachedBlock) { 2128 CachedBlock cb = (CachedBlock) obj; 2129 return compareTo(cb) == 0; 2130 } else { 2131 return false; 2132 } 2133 } 2134 }; 2135 } 2136 2137 @Override 2138 public void remove() { 2139 throw new UnsupportedOperationException(); 2140 } 2141 }; 2142 } 2143 2144 @Override 2145 public BlockCache[] getBlockCaches() { 2146 return null; 2147 } 2148 2149 public int getRpcRefCount(BlockCacheKey cacheKey) { 2150 BucketEntry bucketEntry = backingMap.get(cacheKey); 2151 if (bucketEntry != null) { 2152 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 2153 } 2154 return 0; 2155 } 2156 2157 float getAcceptableFactor() { 2158 return acceptableFactor; 2159 } 2160 2161 float getMinFactor() { 2162 return minFactor; 2163 } 2164 2165 float getExtraFreeFactor() { 2166 return extraFreeFactor; 2167 } 2168 2169 float getSingleFactor() { 2170 return singleFactor; 2171 } 2172 2173 float getMultiFactor() { 2174 return multiFactor; 2175 } 2176 2177 float getMemoryFactor() { 2178 return memoryFactor; 2179 } 2180 2181 long getQueueAdditionWaitTime() { 2182 return queueAdditionWaitTime; 2183 } 2184 2185 long getPersistenceChunkSize() { 2186 return persistenceChunkSize; 2187 } 2188 2189 long getBucketcachePersistInterval() { 2190 return bucketcachePersistInterval; 2191 } 2192 2193 public String getPersistencePath() { 2194 return persistencePath; 2195 } 2196 2197 /** 2198 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 2199 */ 2200 static class RAMCache { 2201 /** 2202 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 2203 * {@link RAMCache#get(BlockCacheKey)} and 2204 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 2205 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 2206 * func method can execute exactly once only when the key is present(or absent) and under the 2207 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 2208 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 2209 */ 2210 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 2211 2212 public boolean containsKey(BlockCacheKey key) { 2213 return delegate.containsKey(key); 2214 } 2215 2216 public RAMQueueEntry get(BlockCacheKey key) { 2217 return delegate.computeIfPresent(key, (k, re) -> { 2218 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 2219 // atomic, another thread may remove and release the block, when retaining in this thread we 2220 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 2221 re.getData().retain(); 2222 return re; 2223 }); 2224 } 2225 2226 /** 2227 * Return the previous associated value, or null if absent. It has the same meaning as 2228 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 2229 */ 2230 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 2231 AtomicBoolean absent = new AtomicBoolean(false); 2232 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 2233 // The RAMCache reference to this entry, so reference count should be increment. 2234 entry.getData().retain(); 2235 absent.set(true); 2236 return entry; 2237 }); 2238 return absent.get() ? null : re; 2239 } 2240 2241 public boolean remove(BlockCacheKey key) { 2242 return remove(key, re -> { 2243 }); 2244 } 2245 2246 /** 2247 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 2248 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 2249 * exception. the consumer will access entry to remove before release its reference count. 2250 * Notice, don't change its reference count in the {@link Consumer} 2251 */ 2252 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 2253 RAMQueueEntry previous = delegate.remove(key); 2254 action.accept(previous); 2255 if (previous != null) { 2256 previous.getData().release(); 2257 } 2258 return previous != null; 2259 } 2260 2261 public boolean isEmpty() { 2262 return delegate.isEmpty(); 2263 } 2264 2265 public void clear() { 2266 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 2267 while (it.hasNext()) { 2268 RAMQueueEntry re = it.next().getValue(); 2269 it.remove(); 2270 re.getData().release(); 2271 } 2272 } 2273 2274 public boolean hasBlocksForFile(String fileName) { 2275 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 2276 .findFirst().isPresent(); 2277 } 2278 } 2279 2280 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 2281 return backingMap; 2282 } 2283 2284 public AtomicBoolean getBackingMapValidated() { 2285 return backingMapValidated; 2286 } 2287 2288 @Override 2289 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2290 return Optional.of(fullyCachedFiles); 2291 } 2292 2293 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2294 if (cacheConf.getBlockCache().isPresent()) { 2295 BlockCache bc = cacheConf.getBlockCache().get(); 2296 if (bc instanceof CombinedBlockCache) { 2297 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2298 if (l2 instanceof BucketCache) { 2299 return Optional.of((BucketCache) l2); 2300 } 2301 } else if (bc instanceof BucketCache) { 2302 return Optional.of((BucketCache) bc); 2303 } 2304 } 2305 return Optional.empty(); 2306 } 2307 2308 @Override 2309 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2310 long size) { 2311 // block eviction may be happening in the background as prefetch runs, 2312 // so we need to count all blocks for this file in the backing map under 2313 // a read lock for the block offset 2314 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2315 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2316 fileName, totalBlockCount, dataBlockCount); 2317 try { 2318 final MutableInt count = new MutableInt(); 2319 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2320 Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); 2321 if (result.isEmpty() && StoreFileInfo.isReference(fileName)) { 2322 result = getAllCacheKeysForFile( 2323 StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0, 2324 Long.MAX_VALUE); 2325 } 2326 result.stream().forEach(entry -> { 2327 LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", 2328 fileName.getName(), entry.getOffset()); 2329 ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); 2330 lock.readLock().lock(); 2331 locks.add(lock); 2332 if (backingMap.containsKey(entry) && entry.getBlockType().isData()) { 2333 count.increment(); 2334 } 2335 }); 2336 // BucketCache would only have data blocks 2337 if (dataBlockCount == count.getValue()) { 2338 LOG.debug("File {} has now been fully cached.", fileName); 2339 fileCacheCompleted(fileName, size); 2340 } else { 2341 LOG.debug( 2342 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2343 + "Total data blocks for file: {}. " 2344 + "Checking for blocks pending cache in cache writer queue.", 2345 fileName, count.getValue(), dataBlockCount); 2346 if (ramCache.hasBlocksForFile(fileName.getName())) { 2347 for (ReentrantReadWriteLock lock : locks) { 2348 lock.readLock().unlock(); 2349 } 2350 locks.clear(); 2351 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2352 + "and try the verification again.", fileName); 2353 Thread.sleep(100); 2354 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2355 } else { 2356 LOG.info( 2357 "The total block count was {}. We found only {} data blocks cached from " 2358 + "a total of {} data blocks for file {}, " 2359 + "but no blocks pending caching. Maybe cache is full or evictions " 2360 + "happened concurrently to cache prefetch.", 2361 totalBlockCount, count, dataBlockCount, fileName); 2362 } 2363 } 2364 } catch (InterruptedException e) { 2365 throw new RuntimeException(e); 2366 } finally { 2367 for (ReentrantReadWriteLock lock : locks) { 2368 lock.readLock().unlock(); 2369 } 2370 } 2371 } 2372 2373 @Override 2374 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2375 if (!isCacheInitialized("blockFitsIntoTheCache")) { 2376 return Optional.of(false); 2377 } 2378 2379 long currentUsed = bucketAllocator.getUsedSize(); 2380 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2381 return Optional.of(result); 2382 } 2383 2384 @Override 2385 public Optional<Boolean> shouldCacheFile(String fileName) { 2386 // if we don't have the file in fullyCachedFiles, we should cache it 2387 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2388 } 2389 2390 @Override 2391 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2392 boolean foundKey = backingMap.containsKey(key); 2393 // if there's no entry for the key itself, we need to check if this key is for a reference, 2394 // and if so, look for a block from the referenced file using this getBlockForReference method. 2395 return Optional.of(foundKey ? true : getBlockForReference(key) != null); 2396 } 2397 2398 @Override 2399 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2400 BucketEntry entry = backingMap.get(key); 2401 if (entry == null) { 2402 // the key might be for a reference tha we had found the block from the referenced file in 2403 // the cache when we first tried to cache it. 2404 entry = getBlockForReference(key); 2405 return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); 2406 } else { 2407 return Optional.of(entry.getOnDiskSizeWithHeader()); 2408 } 2409 2410 } 2411 2412 boolean isCacheInitialized(String api) { 2413 if (cacheState == CacheState.INITIALIZING) { 2414 LOG.warn("Bucket initialisation pending at {}", api); 2415 return false; 2416 } 2417 return true; 2418 } 2419 2420 @Override 2421 public boolean waitForCacheInitialization(long timeout) { 2422 try { 2423 while (cacheState == CacheState.INITIALIZING) { 2424 if (timeout <= 0) { 2425 break; 2426 } 2427 Thread.sleep(100); 2428 timeout -= 100; 2429 } 2430 } finally { 2431 return isCacheEnabled(); 2432 } 2433 } 2434}