001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.Comparator; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.NavigableSet; 036import java.util.Optional; 037import java.util.PriorityQueue; 038import java.util.Set; 039import java.util.concurrent.ArrayBlockingQueue; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.ConcurrentSkipListSet; 044import java.util.concurrent.Executors; 045import java.util.concurrent.ScheduledExecutorService; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicLong; 049import java.util.concurrent.atomic.LongAdder; 050import java.util.concurrent.locks.Lock; 051import java.util.concurrent.locks.ReentrantLock; 052import java.util.concurrent.locks.ReentrantReadWriteLock; 053import java.util.function.Consumer; 054import java.util.function.Function; 055import org.apache.commons.io.IOUtils; 056import org.apache.commons.lang3.mutable.MutableInt; 057import org.apache.hadoop.conf.Configuration; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HBaseIOException; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator; 064import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 065import org.apache.hadoop.hbase.io.HeapSize; 066import org.apache.hadoop.hbase.io.hfile.BlockCache; 067import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 068import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 069import org.apache.hadoop.hbase.io.hfile.BlockPriority; 070import org.apache.hadoop.hbase.io.hfile.BlockType; 071import org.apache.hadoop.hbase.io.hfile.CacheConfig; 072import org.apache.hadoop.hbase.io.hfile.CacheStats; 073import org.apache.hadoop.hbase.io.hfile.Cacheable; 074import org.apache.hadoop.hbase.io.hfile.CachedBlock; 075import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 076import org.apache.hadoop.hbase.io.hfile.HFileBlock; 077import org.apache.hadoop.hbase.io.hfile.HFileContext; 078import org.apache.hadoop.hbase.nio.ByteBuff; 079import org.apache.hadoop.hbase.nio.RefCnt; 080import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 081import org.apache.hadoop.hbase.regionserver.HRegion; 082import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.IdReadWriteLock; 086import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef; 087import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool; 088import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool.ReferenceType; 089import org.apache.hadoop.hbase.util.Pair; 090import org.apache.hadoop.util.StringUtils; 091import org.apache.yetus.audience.InterfaceAudience; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 096import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 097 098import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 099 100/** 101 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache 102 * and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket 103 * cache can use off-heap memory {@link ByteBufferIOEngine} or mmap 104 * {@link ExclusiveMemoryMmapIOEngine} or pmem {@link SharedMemoryMmapIOEngine} or local files 105 * {@link FileIOEngine} to store/read the block data. 106 * <p> 107 * Eviction is via a similar algorithm as used in 108 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 109 * <p> 110 * BucketCache can be used as mainly a block cache (see 111 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with a BlockCache to 112 * decrease CMS GC and heap fragmentation. 113 * <p> 114 * It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to 115 * enlarge cache space via a victim cache. 116 */ 117@InterfaceAudience.Private 118public class BucketCache implements BlockCache, HeapSize { 119 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 120 121 /** Priority buckets config */ 122 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 123 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 124 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 125 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 126 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 127 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 128 static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 129 "hbase.bucketcache.persistence.chunksize"; 130 131 /** Use strong reference for offsetLock or not */ 132 private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; 133 private static final boolean STRONG_REF_DEFAULT = false; 134 135 /** The cache age of blocks to check if the related file is present on any online regions. */ 136 static final String BLOCK_ORPHAN_GRACE_PERIOD = 137 "hbase.bucketcache.block.orphan.evictgraceperiod.seconds"; 138 139 static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L; 140 141 /** Priority buckets */ 142 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 143 static final float DEFAULT_MULTI_FACTOR = 0.50f; 144 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 145 static final float DEFAULT_MIN_FACTOR = 0.85f; 146 147 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 148 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 149 150 // Number of blocks to clear for each of the bucket size that is full 151 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 152 153 /** Statistics thread */ 154 private static final int statThreadPeriod = 5 * 60; 155 156 final static int DEFAULT_WRITER_THREADS = 3; 157 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 158 159 final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000; 160 161 // Store/read block data 162 transient final IOEngine ioEngine; 163 164 // Store the block in this map before writing it to cache 165 transient final RAMCache ramCache; 166 167 // In this map, store the block's meta data like offset, length 168 transient Map<BlockCacheKey, BucketEntry> backingMap; 169 170 private AtomicBoolean backingMapValidated = new AtomicBoolean(false); 171 172 /** 173 * Map of hFile -> Region -> File size. This map is used to track all files completed prefetch, 174 * together with the region those belong to and the total cached size for the 175 * region.TestBlockEvictionOnRegionMovement 176 */ 177 final Map<String, Pair<String, Long>> fullyCachedFiles = new ConcurrentHashMap<>(); 178 /** 179 * Map of region -> total size of the region prefetched on this region server. This is the total 180 * size of hFiles for this region prefetched on this region server 181 */ 182 final Map<String, Long> regionCachedSize = new ConcurrentHashMap<>(); 183 184 private BucketCachePersister cachePersister; 185 186 /** 187 * Enum to represent the state of cache 188 */ 189 protected enum CacheState { 190 // Initializing: State when the cache is being initialised from persistence. 191 INITIALIZING, 192 // Enabled: State when cache is initialised and is ready. 193 ENABLED, 194 // Disabled: State when the cache is disabled. 195 DISABLED 196 } 197 198 /** 199 * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so 200 * that Bucket IO exceptions/errors don't bring down the HBase server. 201 */ 202 private volatile CacheState cacheState; 203 204 /** 205 * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other 206 * words, the work adding blocks to the BucketCache is divided up amongst the running 207 * WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when 208 * it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It 209 * then updates the ramCache and backingMap accordingly. 210 */ 211 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 212 transient final WriterThread[] writerThreads; 213 214 /** Volatile boolean to track if free space is in process or not */ 215 private volatile boolean freeInProgress = false; 216 private transient final Lock freeSpaceLock = new ReentrantLock(); 217 218 private final LongAdder realCacheSize = new LongAdder(); 219 private final LongAdder heapSize = new LongAdder(); 220 /** Current number of cached elements */ 221 private final LongAdder blockNumber = new LongAdder(); 222 223 /** Cache access count (sequential ID) */ 224 private final AtomicLong accessCount = new AtomicLong(); 225 226 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 227 228 private final BucketCacheStats cacheStats = new BucketCacheStats(); 229 private final String persistencePath; 230 static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false); 231 private final long cacheCapacity; 232 /** Approximate block size */ 233 private final long blockSize; 234 235 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 236 private final int ioErrorsTolerationDuration; 237 // 1 min 238 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 239 240 // Start time of first IO error when reading or writing IO Engine, it will be 241 // reset after a successful read/write. 242 private volatile long ioErrorStartTime = -1; 243 244 private Configuration conf; 245 246 /** 247 * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of 248 * this is to avoid freeing the block which is being read. 249 * <p> 250 */ 251 transient final IdReadWriteLock<Long> offsetLock; 252 253 NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>( 254 Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 255 256 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 257 private transient final ScheduledExecutorService scheduleThreadPool = 258 Executors.newScheduledThreadPool(1, 259 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 260 261 // Allocate or free space for the block 262 private transient BucketAllocator bucketAllocator; 263 264 /** Acceptable size of cache (no evictions if size < acceptable) */ 265 private float acceptableFactor; 266 267 /** Minimum threshold of cache (when evicting, evict until size < min) */ 268 private float minFactor; 269 270 /** 271 * Free this floating point factor of extra blocks when evicting. For example free the number of 272 * blocks requested * (1 + extraFreeFactor) 273 */ 274 private float extraFreeFactor; 275 276 /** Single access bucket size */ 277 private float singleFactor; 278 279 /** Multiple access bucket size */ 280 private float multiFactor; 281 282 /** In-memory bucket size */ 283 private float memoryFactor; 284 285 private long bucketcachePersistInterval; 286 287 private static final String FILE_VERIFY_ALGORITHM = 288 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 289 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 290 291 public static final String QUEUE_ADDITION_WAIT_TIME = "hbase.bucketcache.queue.addition.waittime"; 292 private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; 293 private long queueAdditionWaitTime; 294 /** 295 * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file 296 * integrity, default algorithm is MD5 297 */ 298 private String algorithm; 299 300 private long persistenceChunkSize; 301 302 /* Tracing failed Bucket Cache allocations. */ 303 private long allocFailLogPrevTs; // time of previous log event for allocation failure. 304 private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. 305 306 private Map<String, HRegion> onlineRegions; 307 308 private long orphanBlockGracePeriod = 0; 309 310 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 311 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 312 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 313 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 314 } 315 316 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 317 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 318 Configuration conf) throws IOException { 319 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 320 persistencePath, ioErrorsTolerationDuration, conf, null); 321 } 322 323 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 324 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 325 Configuration conf, Map<String, HRegion> onlineRegions) throws IOException { 326 Preconditions.checkArgument(blockSize > 0, 327 "BucketCache capacity is set to " + blockSize + ", can not be less than 0"); 328 boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT); 329 if (useStrongRef) { 330 this.offsetLock = new IdReadWriteLockStrongRef<>(); 331 } else { 332 this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); 333 } 334 this.conf = conf; 335 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 336 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 337 this.writerThreads = new WriterThread[writerThreadNum]; 338 this.onlineRegions = onlineRegions; 339 this.orphanBlockGracePeriod = 340 conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD, BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT); 341 long blockNumCapacity = capacity / blockSize; 342 if (blockNumCapacity >= Integer.MAX_VALUE) { 343 // Enough for about 32TB of cache! 344 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 345 } 346 347 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 348 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 349 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 350 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 351 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 352 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 353 this.queueAdditionWaitTime = 354 conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); 355 this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); 356 this.persistenceChunkSize = 357 conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); 358 if (this.persistenceChunkSize <= 0) { 359 persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; 360 } 361 362 sanityCheckConfigs(); 363 364 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor 365 + ", minFactor: " + minFactor + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " 366 + singleFactor + ", multiFactor: " + multiFactor + ", memoryFactor: " + memoryFactor 367 + ", useStrongRef: " + useStrongRef); 368 369 this.cacheCapacity = capacity; 370 this.persistencePath = persistencePath; 371 this.blockSize = blockSize; 372 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 373 this.cacheState = CacheState.INITIALIZING; 374 375 this.allocFailLogPrevTs = 0; 376 377 for (int i = 0; i < writerThreads.length; ++i) { 378 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 379 } 380 381 assert writerQueues.size() == writerThreads.length; 382 this.ramCache = new RAMCache(); 383 384 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 385 instantiateWriterThreads(); 386 387 if (isCachePersistent()) { 388 if (ioEngine instanceof FileIOEngine) { 389 startBucketCachePersisterThread(); 390 } 391 startPersistenceRetriever(bucketSizes, capacity); 392 } else { 393 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 394 this.cacheState = CacheState.ENABLED; 395 startWriterThreads(); 396 } 397 398 // Run the statistics thread periodically to print the cache statistics log 399 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 400 // every five minutes. 401 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, 402 statThreadPeriod, TimeUnit.SECONDS); 403 LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" 404 + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) 405 + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" 406 + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); 407 } 408 409 private void startPersistenceRetriever(int[] bucketSizes, long capacity) { 410 Runnable persistentCacheRetriever = () -> { 411 try { 412 retrieveFromFile(bucketSizes); 413 LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); 414 } catch (Throwable ex) { 415 LOG.warn("Can't restore from file[{}]. The bucket cache will be reset and rebuilt." 416 + " Exception seen: ", persistencePath, ex); 417 backingMap.clear(); 418 fullyCachedFiles.clear(); 419 backingMapValidated.set(true); 420 regionCachedSize.clear(); 421 try { 422 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 423 } catch (BucketAllocatorException allocatorException) { 424 LOG.error("Exception during Bucket Allocation", allocatorException); 425 } 426 } finally { 427 this.cacheState = CacheState.ENABLED; 428 startWriterThreads(); 429 } 430 }; 431 Thread t = new Thread(persistentCacheRetriever); 432 t.start(); 433 } 434 435 private void sanityCheckConfigs() { 436 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, 437 ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 438 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, 439 MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 440 Preconditions.checkArgument(minFactor <= acceptableFactor, 441 MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 442 Preconditions.checkArgument(extraFreeFactor >= 0, 443 EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 444 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, 445 SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 446 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, 447 MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 448 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, 449 MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 450 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, 451 SINGLE_FACTOR_CONFIG_NAME + ", " + MULTI_FACTOR_CONFIG_NAME + ", and " 452 + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 453 } 454 455 /** 456 * Called by the constructor to instantiate the writer threads. 457 */ 458 private void instantiateWriterThreads() { 459 final String threadName = Thread.currentThread().getName(); 460 for (int i = 0; i < this.writerThreads.length; ++i) { 461 this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); 462 this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 463 this.writerThreads[i].setDaemon(true); 464 } 465 } 466 467 /** 468 * Called by the constructor to start the writer threads. Used by tests that need to override 469 * starting the threads. 470 */ 471 protected void startWriterThreads() { 472 for (WriterThread thread : writerThreads) { 473 thread.start(); 474 } 475 } 476 477 void startBucketCachePersisterThread() { 478 LOG.info("Starting BucketCachePersisterThread"); 479 cachePersister = new BucketCachePersister(this, bucketcachePersistInterval); 480 cachePersister.setDaemon(true); 481 cachePersister.start(); 482 } 483 484 @Override 485 public boolean isCacheEnabled() { 486 return this.cacheState == CacheState.ENABLED; 487 } 488 489 @Override 490 public long getMaxSize() { 491 return this.cacheCapacity; 492 } 493 494 public String getIoEngine() { 495 return ioEngine.toString(); 496 } 497 498 /** 499 * Get the IOEngine from the IO engine name 500 * @return the IOEngine 501 */ 502 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 503 throws IOException { 504 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 505 // In order to make the usage simple, we only need the prefix 'files:' in 506 // document whether one or multiple file(s), but also support 'file:' for 507 // the compatibility 508 String[] filePaths = 509 ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); 510 return new FileIOEngine(capacity, persistencePath != null, filePaths); 511 } else if (ioEngineName.startsWith("offheap")) { 512 return new ByteBufferIOEngine(capacity); 513 } else if (ioEngineName.startsWith("mmap:")) { 514 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 515 } else if (ioEngineName.startsWith("pmem:")) { 516 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 517 // device. Since the persistent memory device has its own address space the contents 518 // mapped to this address space does not get swapped out like in the case of mmapping 519 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 520 // can be directly referred to without having to copy them onheap. Once the RPC is done, 521 // the blocks can be returned back as in case of ByteBufferIOEngine. 522 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 523 } else { 524 throw new IllegalArgumentException( 525 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 526 } 527 } 528 529 public boolean isCachePersistenceEnabled() { 530 return persistencePath != null; 531 } 532 533 /** 534 * Cache the block with the specified name and buffer. 535 * @param cacheKey block's cache key 536 * @param buf block buffer 537 */ 538 @Override 539 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 540 cacheBlock(cacheKey, buf, false); 541 } 542 543 /** 544 * Cache the block with the specified name and buffer. 545 * @param cacheKey block's cache key 546 * @param cachedItem block buffer 547 * @param inMemory if block is in-memory 548 */ 549 @Override 550 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 551 cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); 552 } 553 554 /** 555 * Cache the block with the specified name and buffer. 556 * @param cacheKey block's cache key 557 * @param cachedItem block buffer 558 * @param inMemory if block is in-memory 559 */ 560 @Override 561 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 562 boolean waitWhenCache) { 563 cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); 564 } 565 566 /** 567 * Cache the block to ramCache 568 * @param cacheKey block's cache key 569 * @param cachedItem block buffer 570 * @param inMemory if block is in-memory 571 * @param wait if true, blocking wait when queue is full 572 */ 573 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 574 boolean wait) { 575 if (isCacheEnabled()) { 576 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 577 if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { 578 BucketEntry bucketEntry = backingMap.get(cacheKey); 579 if (bucketEntry != null && bucketEntry.isRpcRef()) { 580 // avoid replace when there are RPC refs for the bucket entry in bucket cache 581 return; 582 } 583 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 584 } 585 } else { 586 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 587 } 588 } 589 } 590 591 protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { 592 return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); 593 } 594 595 protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 596 boolean inMemory, boolean wait) { 597 if (!isCacheEnabled()) { 598 return; 599 } 600 if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { 601 cacheKey.setBlockType(cachedItem.getBlockType()); 602 } 603 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 604 // Stuff the entry into the RAM cache so it can get drained to the persistent store 605 RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 606 inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine); 607 /** 608 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 609 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 610 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 611 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 612 * one, then the heap size will mess up (HBASE-20789) 613 */ 614 if (ramCache.putIfAbsent(cacheKey, re) != null) { 615 return; 616 } 617 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 618 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 619 boolean successfulAddition = false; 620 if (wait) { 621 try { 622 successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); 623 } catch (InterruptedException e) { 624 Thread.currentThread().interrupt(); 625 } 626 } else { 627 successfulAddition = bq.offer(re); 628 } 629 if (!successfulAddition) { 630 LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey); 631 ramCache.remove(cacheKey); 632 cacheStats.failInsert(); 633 } else { 634 this.blockNumber.increment(); 635 this.heapSize.add(cachedItem.heapSize()); 636 } 637 } 638 639 /** 640 * If the passed cache key relates to a reference (<hfile>.<parentEncRegion>), this 641 * method looks for the block from the referred file, in the cache. If present in the cache, the 642 * block for the referred file is returned, otherwise, this method returns null. It will also 643 * return null if the passed cache key doesn't relate to a reference. 644 * @param key the BlockCacheKey instance to look for in the cache. 645 * @return the cached block from the referred file, null if there's no such block in the cache or 646 * the passed key doesn't relate to a reference. 647 */ 648 public BucketEntry getBlockForReference(BlockCacheKey key) { 649 BucketEntry foundEntry = null; 650 String referredFileName = null; 651 if (StoreFileInfo.isReference(key.getHfileName())) { 652 referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond(); 653 } 654 if (referredFileName != null) { 655 BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); 656 foundEntry = backingMap.get(convertedCacheKey); 657 LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), 658 convertedCacheKey, foundEntry); 659 } 660 return foundEntry; 661 } 662 663 /** 664 * Get the buffer of the block with the specified key. 665 * @param key block's cache key 666 * @param caching true if the caller caches blocks on cache misses 667 * @param repeat Whether this is a repeat lookup for the same block 668 * @param updateCacheMetrics Whether we should update cache metrics or not 669 * @return buffer of specified cache key, or null if not in cache 670 */ 671 @Override 672 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 673 boolean updateCacheMetrics) { 674 if (!isCacheEnabled()) { 675 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 676 return null; 677 } 678 RAMQueueEntry re = ramCache.get(key); 679 if (re != null) { 680 if (updateCacheMetrics) { 681 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 682 } 683 re.access(accessCount.incrementAndGet()); 684 return re.getData(); 685 } 686 BucketEntry bucketEntry = backingMap.get(key); 687 if (bucketEntry == null) { 688 bucketEntry = getBlockForReference(key); 689 } 690 if (bucketEntry != null) { 691 long start = System.nanoTime(); 692 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 693 try { 694 lock.readLock().lock(); 695 // We can not read here even if backingMap does contain the given key because its offset 696 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 697 // existence here. 698 if ( 699 bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key)) 700 ) { 701 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 702 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 703 // the same BucketEntry, then all of the three will share the same refCnt. 704 Cacheable cachedBlock = ioEngine.read(bucketEntry); 705 if (ioEngine.usesSharedMemory()) { 706 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 707 // same RefCnt, do retain here, in order to count the number of RPC references 708 cachedBlock.retain(); 709 } 710 // Update the cache statistics. 711 if (updateCacheMetrics) { 712 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 713 cacheStats.ioHit(System.nanoTime() - start); 714 } 715 bucketEntry.access(accessCount.incrementAndGet()); 716 if (this.ioErrorStartTime > 0) { 717 ioErrorStartTime = -1; 718 } 719 return cachedBlock; 720 } 721 } catch (HBaseIOException hioex) { 722 // When using file io engine persistent cache, 723 // the cache map state might differ from the actual cache. If we reach this block, 724 // we should remove the cache key entry from the backing map 725 backingMap.remove(key); 726 fullyCachedFiles.remove(key.getHfileName()); 727 LOG.debug("Failed to fetch block for cache key: {}.", key, hioex); 728 } catch (IOException ioex) { 729 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 730 checkIOErrorIsTolerated(); 731 } finally { 732 lock.readLock().unlock(); 733 } 734 } 735 if (!repeat && updateCacheMetrics) { 736 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 737 } 738 return null; 739 } 740 741 /** 742 * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} 743 */ 744 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, 745 boolean evictedByEvictionProcess) { 746 bucketEntry.markAsEvicted(); 747 blocksByHFile.remove(cacheKey); 748 if (decrementBlockNumber) { 749 this.blockNumber.decrement(); 750 if (ioEngine.isPersistent()) { 751 fileNotFullyCached(cacheKey.getHfileName()); 752 } 753 } 754 if (evictedByEvictionProcess) { 755 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 756 } 757 if (ioEngine.isPersistent()) { 758 setCacheInconsistent(true); 759 } 760 } 761 762 private void fileNotFullyCached(String hfileName) { 763 // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted 764 if (fullyCachedFiles.containsKey(hfileName)) { 765 Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName); 766 String regionEncodedName = regionEntry.getFirst(); 767 long filePrefetchSize = regionEntry.getSecond(); 768 LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName); 769 regionCachedSize.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize); 770 // If all the blocks for a region are evicted from the cache, remove the entry for that region 771 if ( 772 regionCachedSize.containsKey(regionEncodedName) 773 && regionCachedSize.get(regionEncodedName) == 0 774 ) { 775 regionCachedSize.remove(regionEncodedName); 776 } 777 } 778 fullyCachedFiles.remove(hfileName); 779 } 780 781 public void fileCacheCompleted(Path filePath, long size) { 782 Pair<String, Long> pair = new Pair<>(); 783 // sets the region name 784 String regionName = filePath.getParent().getParent().getName(); 785 pair.setFirst(regionName); 786 pair.setSecond(size); 787 fullyCachedFiles.put(filePath.getName(), pair); 788 } 789 790 private void updateRegionCachedSize(Path filePath, long cachedSize) { 791 if (filePath != null) { 792 String regionName = filePath.getParent().getParent().getName(); 793 regionCachedSize.merge(regionName, cachedSize, 794 (previousSize, newBlockSize) -> previousSize + newBlockSize); 795 } 796 } 797 798 /** 799 * Free the {{@link BucketEntry} actually,which could only be invoked when the 800 * {@link BucketEntry#refCnt} becoming 0. 801 */ 802 void freeBucketEntry(BucketEntry bucketEntry) { 803 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 804 realCacheSize.add(-1 * bucketEntry.getLength()); 805 } 806 807 /** 808 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 809 * 1. Close an HFile, and clear all cached blocks. <br> 810 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 811 * <p> 812 * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. 813 * Here we evict the block from backingMap immediately, but only free the reference from bucket 814 * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this 815 * block, block can only be de-allocated when all of them release the block. 816 * <p> 817 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 818 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 819 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 820 * @param cacheKey Block to evict 821 * @return true to indicate whether we've evicted successfully or not. 822 */ 823 @Override 824 public boolean evictBlock(BlockCacheKey cacheKey) { 825 return doEvictBlock(cacheKey, null, false); 826 } 827 828 /** 829 * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and 830 * {@link BucketCache#ramCache}. <br/> 831 * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 832 * {@link BucketEntry} could be removed. 833 * @param cacheKey {@link BlockCacheKey} to evict. 834 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 835 * @return true to indicate whether we've evicted successfully or not. 836 */ 837 private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, 838 boolean evictedByEvictionProcess) { 839 if (!isCacheEnabled()) { 840 return false; 841 } 842 boolean existedInRamCache = removeFromRamCache(cacheKey); 843 if (bucketEntry == null) { 844 bucketEntry = backingMap.get(cacheKey); 845 } 846 final BucketEntry bucketEntryToUse = bucketEntry; 847 848 if (bucketEntryToUse == null) { 849 if (existedInRamCache && evictedByEvictionProcess) { 850 cacheStats.evicted(0, cacheKey.isPrimary()); 851 } 852 return existedInRamCache; 853 } else { 854 return bucketEntryToUse.withWriteLock(offsetLock, () -> { 855 if (backingMap.remove(cacheKey, bucketEntryToUse)) { 856 LOG.debug("removed key {} from back map with offset lock {} in the evict process", 857 cacheKey, bucketEntryToUse.offset()); 858 blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); 859 return true; 860 } 861 return false; 862 }); 863 } 864 } 865 866 /** 867 * <pre> 868 * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as 869 * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}. 870 * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf} 871 * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different: 872 * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap}, 873 * it is the return value of current {@link BucketCache#createRecycler} method. 874 * 875 * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache}, 876 * it is {@link ByteBuffAllocator#putbackBuffer}. 877 * </pre> 878 */ 879 public Recycler createRecycler(final BucketEntry bucketEntry) { 880 return () -> { 881 freeBucketEntry(bucketEntry); 882 return; 883 }; 884 } 885 886 /** 887 * NOTE: This method is only for test. 888 */ 889 public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { 890 BucketEntry bucketEntry = backingMap.get(blockCacheKey); 891 if (bucketEntry == null) { 892 return false; 893 } 894 return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); 895 } 896 897 /** 898 * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if 899 * {@link BucketEntry#isRpcRef} is false. <br/> 900 * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and 901 * {@link BucketEntry} could be removed. 902 * @param blockCacheKey {@link BlockCacheKey} to evict. 903 * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. 904 * @return true to indicate whether we've evicted successfully or not. 905 */ 906 boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { 907 if (!bucketEntry.isRpcRef()) { 908 return doEvictBlock(blockCacheKey, bucketEntry, true); 909 } 910 return false; 911 } 912 913 protected boolean removeFromRamCache(BlockCacheKey cacheKey) { 914 return ramCache.remove(cacheKey, re -> { 915 if (re != null) { 916 this.blockNumber.decrement(); 917 this.heapSize.add(-1 * re.getData().heapSize()); 918 } 919 }); 920 } 921 922 public boolean isCacheInconsistent() { 923 return isCacheInconsistent.get(); 924 } 925 926 public void setCacheInconsistent(boolean setCacheInconsistent) { 927 isCacheInconsistent.set(setCacheInconsistent); 928 } 929 930 protected void setCacheState(CacheState state) { 931 cacheState = state; 932 } 933 934 /* 935 * Statistics thread. Periodically output cache statistics to the log. 936 */ 937 private static class StatisticsThread extends Thread { 938 private final BucketCache bucketCache; 939 940 public StatisticsThread(BucketCache bucketCache) { 941 super("BucketCacheStatsThread"); 942 setDaemon(true); 943 this.bucketCache = bucketCache; 944 } 945 946 @Override 947 public void run() { 948 bucketCache.logStats(); 949 } 950 } 951 952 public void logStats() { 953 if (!isCacheInitialized("BucketCache::logStats")) { 954 return; 955 } 956 957 long totalSize = bucketAllocator.getTotalSize(); 958 long usedSize = bucketAllocator.getUsedSize(); 959 long freeSize = totalSize - usedSize; 960 long cacheSize = getRealCacheSize(); 961 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + "totalSize=" 962 + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " 963 + "usedSize=" + StringUtils.byteDesc(usedSize) + ", " + "cacheSize=" 964 + StringUtils.byteDesc(cacheSize) + ", " + "accesses=" + cacheStats.getRequestCount() + ", " 965 + "hits=" + cacheStats.getHitCount() + ", " + "IOhitsPerSecond=" 966 + cacheStats.getIOHitsPerSecond() + ", " + "IOTimePerHit=" 967 + String.format("%.2f", cacheStats.getIOTimePerHit()) + ", " + "hitRatio=" 968 + (cacheStats.getHitCount() == 0 969 ? "0," 970 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 971 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 972 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 973 + (cacheStats.getHitCachingCount() == 0 974 ? "0," 975 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 976 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 977 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction() 978 + ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount() + ", blocksCount=" 979 + backingMap.size()); 980 cacheStats.reset(); 981 982 bucketAllocator.logDebugStatistics(); 983 } 984 985 public long getRealCacheSize() { 986 return this.realCacheSize.sum(); 987 } 988 989 public long acceptableSize() { 990 if (!isCacheInitialized("BucketCache::acceptableSize")) { 991 return 0; 992 } 993 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 994 } 995 996 long getPartitionSize(float partitionFactor) { 997 if (!isCacheInitialized("BucketCache::getPartitionSize")) { 998 return 0; 999 } 1000 1001 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 1002 } 1003 1004 /** 1005 * Return the count of bucketSizeinfos still need free space 1006 */ 1007 private int bucketSizesAboveThresholdCount(float minFactor) { 1008 if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { 1009 return 0; 1010 } 1011 1012 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1013 int fullCount = 0; 1014 for (int i = 0; i < stats.length; i++) { 1015 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1016 freeGoal = Math.max(freeGoal, 1); 1017 if (stats[i].freeCount() < freeGoal) { 1018 fullCount++; 1019 } 1020 } 1021 return fullCount; 1022 } 1023 1024 /** 1025 * This method will find the buckets that are minimally occupied and are not reference counted and 1026 * will free them completely without any constraint on the access times of the elements, and as a 1027 * process will completely free at most the number of buckets passed, sometimes it might not due 1028 * to changing refCounts 1029 * @param completelyFreeBucketsNeeded number of buckets to free 1030 **/ 1031 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 1032 if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { 1033 return; 1034 } 1035 1036 if (completelyFreeBucketsNeeded != 0) { 1037 // First we will build a set where the offsets are reference counted, usually 1038 // this set is small around O(Handler Count) unless something else is wrong 1039 Set<Integer> inUseBuckets = new HashSet<>(); 1040 backingMap.forEach((k, be) -> { 1041 if (be.isRpcRef()) { 1042 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 1043 } 1044 }); 1045 Set<Integer> candidateBuckets = 1046 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 1047 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 1048 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 1049 evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); 1050 } 1051 } 1052 } 1053 } 1054 1055 private long calculateBytesToFree(StringBuilder msgBuffer) { 1056 long bytesToFreeWithoutExtra = 0; 1057 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 1058 long[] bytesToFreeForBucket = new long[stats.length]; 1059 for (int i = 0; i < stats.length; i++) { 1060 bytesToFreeForBucket[i] = 0; 1061 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 1062 freeGoal = Math.max(freeGoal, 1); 1063 if (stats[i].freeCount() < freeGoal) { 1064 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 1065 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 1066 if (msgBuffer != null) { 1067 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 1068 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 1069 } 1070 } 1071 } 1072 if (msgBuffer != null) { 1073 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 1074 } 1075 return bytesToFreeWithoutExtra; 1076 } 1077 1078 /** 1079 * Free the space if the used size reaches acceptableSize() or one size block couldn't be 1080 * allocated. When freeing the space, we use the LRU algorithm and ensure there must be some 1081 * blocks evicted 1082 * @param why Why we are being called 1083 */ 1084 void freeSpace(final String why) { 1085 if (!isCacheInitialized("BucketCache::freeSpace")) { 1086 return; 1087 } 1088 // Ensure only one freeSpace progress at a time 1089 if (!freeSpaceLock.tryLock()) { 1090 return; 1091 } 1092 try { 1093 freeInProgress = true; 1094 StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() : null; 1095 long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer); 1096 if (bytesToFreeWithoutExtra <= 0) { 1097 return; 1098 } 1099 long currentSize = bucketAllocator.getUsedSize(); 1100 long totalSize = bucketAllocator.getTotalSize(); 1101 if (LOG.isDebugEnabled() && msgBuffer != null) { 1102 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of current used=" 1103 + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" 1104 + StringUtils.byteDesc(realCacheSize.sum()) + ", total=" 1105 + StringUtils.byteDesc(totalSize)); 1106 } 1107 long bytesToFreeWithExtra = 1108 (long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor)); 1109 // Instantiate priority buckets 1110 BucketEntryGroup bucketSingle = 1111 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor)); 1112 BucketEntryGroup bucketMulti = 1113 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(multiFactor)); 1114 BucketEntryGroup bucketMemory = 1115 new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor)); 1116 1117 Set<String> allValidFiles = null; 1118 // We need the region/stores/files tree, in order to figure out if a block is "orphan" or not. 1119 // See further comments below for more details. 1120 if (onlineRegions != null) { 1121 allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions); 1122 } 1123 // the cached time is recored in nanos, so we need to convert the grace period accordingly 1124 long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000; 1125 long bytesFreed = 0; 1126 // Scan entire map putting bucket entry into appropriate bucket entry 1127 // group 1128 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 1129 BlockCacheKey key = bucketEntryWithKey.getKey(); 1130 BucketEntry entry = bucketEntryWithKey.getValue(); 1131 // Under certain conditions, blocks for regions not on the current region server might 1132 // be hanging on the cache. For example, when using the persistent cache feature, if the 1133 // RS crashes, then if not the same regions are assigned back once its online again, blocks 1134 // for the previous online regions would be recovered and stay in the cache. These would be 1135 // "orphan" blocks, as the files these blocks belong to are not in any of the online 1136 // regions. 1137 // "Orphan" blocks are a pure waste of cache space and should be evicted first during 1138 // the freespace run. 1139 // Compactions and Flushes may cache blocks before its files are completely written. In 1140 // these cases the file won't be found in any of the online regions stores, but the block 1141 // shouldn't be evicted. To avoid this, we defined this 1142 // hbase.bucketcache.block.orphan.evictgraceperiod property, to account for a grace 1143 // period (default 24 hours) where a block should be checked if it's an orphan block. 1144 if ( 1145 allValidFiles != null 1146 && entry.getCachedTime() < (System.nanoTime() - orphanGracePeriodNanos) 1147 ) { 1148 if (!allValidFiles.contains(key.getHfileName())) { 1149 if (evictBucketEntryIfNoRpcReferenced(key, entry)) { 1150 // We calculate the freed bytes, but we don't stop if the goal was reached because 1151 // these are orphan blocks anyway, so let's leverage this run of freeSpace 1152 // to get rid of all orphans at once. 1153 bytesFreed += entry.getLength(); 1154 continue; 1155 } 1156 } 1157 } 1158 switch (entry.getPriority()) { 1159 case SINGLE: { 1160 bucketSingle.add(bucketEntryWithKey); 1161 break; 1162 } 1163 case MULTI: { 1164 bucketMulti.add(bucketEntryWithKey); 1165 break; 1166 } 1167 case MEMORY: { 1168 bucketMemory.add(bucketEntryWithKey); 1169 break; 1170 } 1171 } 1172 } 1173 PriorityQueue<BucketEntryGroup> bucketQueue = 1174 new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow)); 1175 1176 bucketQueue.add(bucketSingle); 1177 bucketQueue.add(bucketMulti); 1178 bucketQueue.add(bucketMemory); 1179 1180 int remainingBuckets = bucketQueue.size(); 1181 1182 BucketEntryGroup bucketGroup; 1183 while ((bucketGroup = bucketQueue.poll()) != null) { 1184 long overflow = bucketGroup.overflow(); 1185 if (overflow > 0) { 1186 long bucketBytesToFree = 1187 Math.min(overflow, (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 1188 bytesFreed += bucketGroup.free(bucketBytesToFree); 1189 } 1190 remainingBuckets--; 1191 } 1192 1193 // Check and free if there are buckets that still need freeing of space 1194 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 1195 bucketQueue.clear(); 1196 remainingBuckets = 3; 1197 bucketQueue.add(bucketSingle); 1198 bucketQueue.add(bucketMulti); 1199 bucketQueue.add(bucketMemory); 1200 while ((bucketGroup = bucketQueue.poll()) != null) { 1201 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 1202 bytesFreed += bucketGroup.free(bucketBytesToFree); 1203 remainingBuckets--; 1204 } 1205 } 1206 // Even after the above free we might still need freeing because of the 1207 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 1208 // there might be some buckets where the occupancy is very sparse and thus are not 1209 // yielding the free for the other bucket sizes, the fix for this to evict some 1210 // of the buckets, we do this by evicting the buckets that are least fulled 1211 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * bucketSizesAboveThresholdCount(1.0f)); 1212 1213 if (LOG.isDebugEnabled()) { 1214 long single = bucketSingle.totalSize(); 1215 long multi = bucketMulti.totalSize(); 1216 long memory = bucketMemory.totalSize(); 1217 if (LOG.isDebugEnabled()) { 1218 LOG.debug("Bucket cache free space completed; " + "freed=" 1219 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(totalSize) 1220 + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" 1221 + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); 1222 } 1223 } 1224 } catch (Throwable t) { 1225 LOG.warn("Failed freeing space", t); 1226 } finally { 1227 cacheStats.evict(); 1228 freeInProgress = false; 1229 freeSpaceLock.unlock(); 1230 } 1231 } 1232 1233 // This handles flushing the RAM cache to IOEngine. 1234 class WriterThread extends Thread { 1235 private final BlockingQueue<RAMQueueEntry> inputQueue; 1236 private volatile boolean writerEnabled = true; 1237 private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); 1238 1239 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 1240 super("BucketCacheWriterThread"); 1241 this.inputQueue = queue; 1242 } 1243 1244 // Used for test 1245 void disableWriter() { 1246 this.writerEnabled = false; 1247 } 1248 1249 @Override 1250 public void run() { 1251 List<RAMQueueEntry> entries = new ArrayList<>(); 1252 try { 1253 while (isCacheEnabled() && writerEnabled) { 1254 try { 1255 try { 1256 // Blocks 1257 entries = getRAMQueueEntries(inputQueue, entries); 1258 } catch (InterruptedException ie) { 1259 if (!isCacheEnabled() || !writerEnabled) { 1260 break; 1261 } 1262 } 1263 doDrain(entries, metaBuff); 1264 } catch (Exception ioe) { 1265 LOG.error("WriterThread encountered error", ioe); 1266 } 1267 } 1268 } catch (Throwable t) { 1269 LOG.warn("Failed doing drain", t); 1270 } 1271 LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); 1272 } 1273 } 1274 1275 /** 1276 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 1277 * cache with a new block for the same cache key. there's a corner case: one thread cache a block 1278 * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block 1279 * with the same cache key do the same thing for the same cache key, so if not evict the previous 1280 * bucket entry, then memory leak happen because the previous bucketEntry is gone but the 1281 * bucketAllocator do not free its memory. 1282 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 1283 * cacheKey, Cacheable newBlock) 1284 * @param key Block cache key 1285 * @param bucketEntry Bucket entry to put into backingMap. 1286 */ 1287 protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 1288 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 1289 blocksByHFile.add(key); 1290 updateRegionCachedSize(key.getFilePath(), bucketEntry.getLength()); 1291 if (previousEntry != null && previousEntry != bucketEntry) { 1292 previousEntry.withWriteLock(offsetLock, () -> { 1293 blockEvicted(key, previousEntry, false, false); 1294 return null; 1295 }); 1296 } 1297 } 1298 1299 /** 1300 * Prepare and return a warning message for Bucket Allocator Exception 1301 * @param fle The exception 1302 * @param re The RAMQueueEntry for which the exception was thrown. 1303 * @return A warning message created from the input RAMQueueEntry object. 1304 */ 1305 private static String getAllocationFailWarningMessage(final BucketAllocatorException fle, 1306 final RAMQueueEntry re) { 1307 final StringBuilder sb = new StringBuilder(); 1308 sb.append("Most recent failed allocation after "); 1309 sb.append(ALLOCATION_FAIL_LOG_TIME_PERIOD); 1310 sb.append(" ms;"); 1311 if (re != null) { 1312 if (re.getData() instanceof HFileBlock) { 1313 final HFileContext fileContext = ((HFileBlock) re.getData()).getHFileContext(); 1314 final String columnFamily = Bytes.toString(fileContext.getColumnFamily()); 1315 final String tableName = Bytes.toString(fileContext.getTableName()); 1316 if (tableName != null) { 1317 sb.append(" Table: "); 1318 sb.append(tableName); 1319 } 1320 if (columnFamily != null) { 1321 sb.append(" CF: "); 1322 sb.append(columnFamily); 1323 } 1324 sb.append(" HFile: "); 1325 if (fileContext.getHFileName() != null) { 1326 sb.append(fileContext.getHFileName()); 1327 } else { 1328 sb.append(re.getKey()); 1329 } 1330 } else { 1331 sb.append(" HFile: "); 1332 sb.append(re.getKey()); 1333 } 1334 } 1335 sb.append(" Message: "); 1336 sb.append(fle.getMessage()); 1337 return sb.toString(); 1338 } 1339 1340 /** 1341 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that 1342 * are passed in even if failure being sure to remove from ramCache else we'll never undo the 1343 * references and we'll OOME. 1344 * @param entries Presumes list passed in here will be processed by this invocation only. No 1345 * interference expected. 1346 */ 1347 void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException { 1348 if (entries.isEmpty()) { 1349 return; 1350 } 1351 // This method is a little hard to follow. We run through the passed in entries and for each 1352 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 1353 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 1354 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 1355 // filling ramCache. We do the clean up by again running through the passed in entries 1356 // doing extra work when we find a non-null bucketEntries corresponding entry. 1357 final int size = entries.size(); 1358 BucketEntry[] bucketEntries = new BucketEntry[size]; 1359 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 1360 // when we go to add an entry by going around the loop again without upping the index. 1361 int index = 0; 1362 while (isCacheEnabled() && index < size) { 1363 RAMQueueEntry re = null; 1364 try { 1365 re = entries.get(index); 1366 if (re == null) { 1367 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 1368 index++; 1369 continue; 1370 } 1371 // Reset the position for reuse. 1372 // It should be guaranteed that the data in the metaBuff has been transferred to the 1373 // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already 1374 // transferred with our current IOEngines. Should take care, when we have new kinds of 1375 // IOEngine in the future. 1376 metaBuff.clear(); 1377 BucketEntry bucketEntry = 1378 re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); 1379 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 1380 bucketEntries[index] = bucketEntry; 1381 if (ioErrorStartTime > 0) { 1382 ioErrorStartTime = -1; 1383 } 1384 index++; 1385 } catch (BucketAllocatorException fle) { 1386 long currTs = EnvironmentEdgeManager.currentTime(); 1387 cacheStats.allocationFailed(); // Record the warning. 1388 if ( 1389 allocFailLogPrevTs == 0 || (currTs - allocFailLogPrevTs) > ALLOCATION_FAIL_LOG_TIME_PERIOD 1390 ) { 1391 LOG.warn(getAllocationFailWarningMessage(fle, re)); 1392 allocFailLogPrevTs = currTs; 1393 } 1394 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1395 bucketEntries[index] = null; 1396 index++; 1397 } catch (CacheFullException cfe) { 1398 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1399 if (!freeInProgress) { 1400 freeSpace("Full!"); 1401 } else { 1402 Thread.sleep(50); 1403 } 1404 } catch (IOException ioex) { 1405 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1406 LOG.error("Failed writing to bucket cache", ioex); 1407 checkIOErrorIsTolerated(); 1408 } 1409 } 1410 1411 // Make sure data pages are written on media before we update maps. 1412 try { 1413 ioEngine.sync(); 1414 } catch (IOException ioex) { 1415 LOG.error("Failed syncing IO engine", ioex); 1416 checkIOErrorIsTolerated(); 1417 // Since we failed sync, free the blocks in bucket allocator 1418 for (int i = 0; i < entries.size(); ++i) { 1419 BucketEntry bucketEntry = bucketEntries[i]; 1420 if (bucketEntry != null) { 1421 bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength()); 1422 bucketEntries[i] = null; 1423 } 1424 } 1425 } 1426 1427 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1428 // success or error. 1429 for (int i = 0; i < size; ++i) { 1430 BlockCacheKey key = entries.get(i).getKey(); 1431 // Only add if non-null entry. 1432 if (bucketEntries[i] != null) { 1433 putIntoBackingMap(key, bucketEntries[i]); 1434 if (ioEngine.isPersistent()) { 1435 setCacheInconsistent(true); 1436 } 1437 } 1438 // Always remove from ramCache even if we failed adding it to the block cache above. 1439 boolean existed = ramCache.remove(key, re -> { 1440 if (re != null) { 1441 heapSize.add(-1 * re.getData().heapSize()); 1442 } 1443 }); 1444 if (!existed && bucketEntries[i] != null) { 1445 // Block should have already been evicted. Remove it and free space. 1446 final BucketEntry bucketEntry = bucketEntries[i]; 1447 bucketEntry.withWriteLock(offsetLock, () -> { 1448 if (backingMap.remove(key, bucketEntry)) { 1449 blockEvicted(key, bucketEntry, false, false); 1450 } 1451 return null; 1452 }); 1453 } 1454 } 1455 1456 long used = bucketAllocator.getUsedSize(); 1457 if (used > acceptableSize()) { 1458 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1459 } 1460 return; 1461 } 1462 1463 /** 1464 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1465 * returning. 1466 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1467 * in case. 1468 * @param q The queue to take from. 1469 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1470 */ 1471 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1472 List<RAMQueueEntry> receptacle) throws InterruptedException { 1473 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1474 // ok even if list grew to accommodate thousands. 1475 receptacle.clear(); 1476 receptacle.add(q.take()); 1477 q.drainTo(receptacle); 1478 return receptacle; 1479 } 1480 1481 /** 1482 * @see #retrieveFromFile(int[]) 1483 */ 1484 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION", 1485 justification = "false positive, try-with-resources ensures close is called.") 1486 void persistToFile() throws IOException { 1487 LOG.debug("Thread {} started persisting bucket cache to file", 1488 Thread.currentThread().getName()); 1489 if (!isCachePersistent()) { 1490 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1491 } 1492 File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); 1493 try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { 1494 LOG.debug("Persist in new chunked persistence format."); 1495 1496 persistChunkedBackingMap(fos); 1497 1498 LOG.debug( 1499 "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," 1500 + " file name: {}", 1501 backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); 1502 } catch (IOException e) { 1503 LOG.error("Failed to persist bucket cache to file", e); 1504 throw e; 1505 } catch (Throwable e) { 1506 LOG.error("Failed during persist bucket cache to file: ", e); 1507 throw e; 1508 } 1509 LOG.debug("Thread {} finished persisting bucket cache to file, renaming", 1510 Thread.currentThread().getName()); 1511 if (!tempPersistencePath.renameTo(new File(persistencePath))) { 1512 LOG.warn("Failed to commit cache persistent file. We might lose cached blocks if " 1513 + "RS crashes/restarts before we successfully checkpoint again."); 1514 } 1515 } 1516 1517 public boolean isCachePersistent() { 1518 return ioEngine.isPersistent() && persistencePath != null; 1519 } 1520 1521 @Override 1522 public Optional<Map<String, Long>> getRegionCachedInfo() { 1523 return Optional.of(Collections.unmodifiableMap(regionCachedSize)); 1524 } 1525 1526 /** 1527 * @see #persistToFile() 1528 */ 1529 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1530 LOG.info("Started retrieving bucket cache from file"); 1531 File persistenceFile = new File(persistencePath); 1532 if (!persistenceFile.exists()) { 1533 LOG.warn("Persistence file missing! " 1534 + "It's ok if it's first run after enabling persistent cache."); 1535 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1536 blockNumber.add(backingMap.size()); 1537 backingMapValidated.set(true); 1538 return; 1539 } 1540 assert !isCacheEnabled(); 1541 1542 try (FileInputStream in = new FileInputStream(persistenceFile)) { 1543 int pblen = ProtobufMagic.lengthOfPBMagic(); 1544 byte[] pbuf = new byte[pblen]; 1545 IOUtils.readFully(in, pbuf, 0, pblen); 1546 if (ProtobufMagic.isPBMagicPrefix(pbuf)) { 1547 LOG.info("Reading old format of persistence."); 1548 // The old non-chunked version of backing map persistence. 1549 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1550 } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { 1551 // The new persistence format of chunked persistence. 1552 LOG.info("Reading new chunked format of persistence."); 1553 retrieveChunkedBackingMap(in); 1554 } else { 1555 // In 3.0 we have enough flexibility to dump the old cache data. 1556 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1557 throw new IOException( 1558 "Persistence file does not start with protobuf magic number. " + persistencePath); 1559 } 1560 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1561 blockNumber.add(backingMap.size()); 1562 LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); 1563 } 1564 } 1565 1566 private void updateRegionSizeMapWhileRetrievingFromFile() { 1567 // Update the regionCachedSize with the region size while restarting the region server 1568 if (LOG.isDebugEnabled()) { 1569 LOG.debug("Updating region size map after retrieving cached file list"); 1570 dumpPrefetchList(); 1571 } 1572 regionCachedSize.clear(); 1573 fullyCachedFiles.forEach((hFileName, hFileSize) -> { 1574 // Get the region name for each file 1575 String regionEncodedName = hFileSize.getFirst(); 1576 long cachedFileSize = hFileSize.getSecond(); 1577 regionCachedSize.merge(regionEncodedName, cachedFileSize, 1578 (oldpf, fileSize) -> oldpf + fileSize); 1579 }); 1580 } 1581 1582 private void dumpPrefetchList() { 1583 for (Map.Entry<String, Pair<String, Long>> outerEntry : fullyCachedFiles.entrySet()) { 1584 LOG.debug("Cached File Entry:<{},<{},{}>>", outerEntry.getKey(), 1585 outerEntry.getValue().getFirst(), outerEntry.getValue().getSecond()); 1586 } 1587 } 1588 1589 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1590 throws IOException { 1591 if (capacitySize != cacheCapacity) { 1592 throw new IOException("Mismatched cache capacity:" + StringUtils.byteDesc(capacitySize) 1593 + ", expected: " + StringUtils.byteDesc(cacheCapacity)); 1594 } 1595 if (!ioEngine.getClass().getName().equals(ioclass)) { 1596 throw new IOException("Class name for IO engine mismatch: " + ioclass + ", expected:" 1597 + ioEngine.getClass().getName()); 1598 } 1599 if (!backingMap.getClass().getName().equals(mapclass)) { 1600 throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" 1601 + backingMap.getClass().getName()); 1602 } 1603 } 1604 1605 private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { 1606 try { 1607 if (proto.hasChecksum()) { 1608 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1609 algorithm); 1610 } 1611 backingMapValidated.set(true); 1612 } catch (IOException e) { 1613 LOG.warn("Checksum for cache file failed. " 1614 + "We need to validate each cache key in the backing map. " 1615 + "This may take some time, so we'll do it in a background thread,"); 1616 1617 Runnable cacheValidator = () -> { 1618 while (bucketAllocator == null) { 1619 try { 1620 Thread.sleep(50); 1621 } catch (InterruptedException ex) { 1622 throw new RuntimeException(ex); 1623 } 1624 } 1625 long startTime = EnvironmentEdgeManager.currentTime(); 1626 int totalKeysOriginally = backingMap.size(); 1627 for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) { 1628 try { 1629 ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); 1630 } catch (IOException e1) { 1631 LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); 1632 evictBlock(keyEntry.getKey()); 1633 fileNotFullyCached(keyEntry.getKey().getHfileName()); 1634 } 1635 } 1636 backingMapValidated.set(true); 1637 LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", 1638 totalKeysOriginally, backingMap.size(), 1639 (EnvironmentEdgeManager.currentTime() - startTime)); 1640 }; 1641 Thread t = new Thread(cacheValidator); 1642 t.setDaemon(true); 1643 t.start(); 1644 } 1645 } 1646 1647 private void updateCacheIndex(BucketCacheProtos.BackingMap chunk, 1648 java.util.Map<java.lang.Integer, java.lang.String> deserializer) throws IOException { 1649 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 = 1650 BucketProtoUtils.fromPB(deserializer, chunk, this::createRecycler); 1651 backingMap.putAll(pair2.getFirst()); 1652 blocksByHFile.addAll(pair2.getSecond()); 1653 } 1654 1655 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1656 Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair = 1657 BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), 1658 this::createRecycler); 1659 backingMap = pair.getFirst(); 1660 blocksByHFile = pair.getSecond(); 1661 fullyCachedFiles.clear(); 1662 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); 1663 1664 LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), 1665 fullyCachedFiles.size()); 1666 1667 verifyFileIntegrity(proto); 1668 updateRegionSizeMapWhileRetrievingFromFile(); 1669 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1670 } 1671 1672 private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { 1673 LOG.debug( 1674 "persistToFile: before persisting backing map size: {}, " 1675 + "fullycachedFiles size: {}, chunkSize: {}", 1676 backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize); 1677 1678 BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize); 1679 1680 LOG.debug( 1681 "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}", 1682 backingMap.size(), fullyCachedFiles.size()); 1683 } 1684 1685 private void retrieveChunkedBackingMap(FileInputStream in) throws IOException { 1686 1687 // Read the first chunk that has all the details. 1688 BucketCacheProtos.BucketCacheEntry cacheEntry = 1689 BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); 1690 1691 fullyCachedFiles.clear(); 1692 fullyCachedFiles.putAll(BucketProtoUtils.fromPB(cacheEntry.getCachedFilesMap())); 1693 1694 backingMap.clear(); 1695 blocksByHFile.clear(); 1696 1697 // Read the backing map entries in batches. 1698 int numChunks = 0; 1699 while (in.available() > 0) { 1700 updateCacheIndex(BucketCacheProtos.BackingMap.parseDelimitedFrom(in), 1701 cacheEntry.getDeserializersMap()); 1702 numChunks++; 1703 } 1704 1705 LOG.info("Retrieved {} of chunks with blockCount = {}.", numChunks, backingMap.size()); 1706 verifyFileIntegrity(cacheEntry); 1707 verifyCapacityAndClasses(cacheEntry.getCacheCapacity(), cacheEntry.getIoClass(), 1708 cacheEntry.getMapClass()); 1709 updateRegionSizeMapWhileRetrievingFromFile(); 1710 } 1711 1712 /** 1713 * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors 1714 * exceeds ioErrorsDurationTimeTolerated, we will disable the cache 1715 */ 1716 private void checkIOErrorIsTolerated() { 1717 long now = EnvironmentEdgeManager.currentTime(); 1718 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1719 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1720 if (ioErrorStartTimeTmp > 0) { 1721 if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1722 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration 1723 + "ms, disabling cache, please check your IOEngine"); 1724 disableCache(); 1725 } 1726 } else { 1727 this.ioErrorStartTime = now; 1728 } 1729 } 1730 1731 /** 1732 * Used to shut down the cache -or- turn it off in the case of something broken. 1733 */ 1734 private void disableCache() { 1735 if (!isCacheEnabled()) { 1736 return; 1737 } 1738 LOG.info("Disabling cache"); 1739 cacheState = CacheState.DISABLED; 1740 ioEngine.shutdown(); 1741 this.scheduleThreadPool.shutdown(); 1742 for (int i = 0; i < writerThreads.length; ++i) 1743 writerThreads[i].interrupt(); 1744 this.ramCache.clear(); 1745 if (!ioEngine.isPersistent() || persistencePath == null) { 1746 // If persistent ioengine and a path, we will serialize out the backingMap. 1747 this.backingMap.clear(); 1748 this.blocksByHFile.clear(); 1749 this.fullyCachedFiles.clear(); 1750 this.regionCachedSize.clear(); 1751 } 1752 } 1753 1754 private void join() throws InterruptedException { 1755 for (int i = 0; i < writerThreads.length; ++i) 1756 writerThreads[i].join(); 1757 } 1758 1759 @Override 1760 public void shutdown() { 1761 disableCache(); 1762 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() + "; path to write=" 1763 + persistencePath); 1764 if (ioEngine.isPersistent() && persistencePath != null) { 1765 try { 1766 join(); 1767 if (cachePersister != null) { 1768 LOG.info("Shutting down cache persister thread."); 1769 cachePersister.shutdown(); 1770 while (cachePersister.isAlive()) { 1771 Thread.sleep(10); 1772 } 1773 } 1774 persistToFile(); 1775 } catch (IOException ex) { 1776 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1777 } catch (InterruptedException e) { 1778 LOG.warn("Failed to persist data on exit", e); 1779 } 1780 } 1781 } 1782 1783 /** 1784 * Needed mostly for UTs that might run in the same VM and create different BucketCache instances 1785 * on different UT methods. 1786 */ 1787 @Override 1788 protected void finalize() { 1789 if (cachePersister != null && !cachePersister.isInterrupted()) { 1790 cachePersister.interrupt(); 1791 } 1792 } 1793 1794 @Override 1795 public CacheStats getStats() { 1796 return cacheStats; 1797 } 1798 1799 public BucketAllocator getAllocator() { 1800 return this.bucketAllocator; 1801 } 1802 1803 @Override 1804 public long heapSize() { 1805 return this.heapSize.sum(); 1806 } 1807 1808 @Override 1809 public long size() { 1810 return this.realCacheSize.sum(); 1811 } 1812 1813 @Override 1814 public long getCurrentDataSize() { 1815 return size(); 1816 } 1817 1818 @Override 1819 public long getFreeSize() { 1820 if (!isCacheInitialized("BucketCache:getFreeSize")) { 1821 return 0; 1822 } 1823 return this.bucketAllocator.getFreeSize(); 1824 } 1825 1826 @Override 1827 public long getBlockCount() { 1828 return this.blockNumber.sum(); 1829 } 1830 1831 @Override 1832 public long getDataBlockCount() { 1833 return getBlockCount(); 1834 } 1835 1836 @Override 1837 public long getCurrentSize() { 1838 if (!isCacheInitialized("BucketCache::getCurrentSize")) { 1839 return 0; 1840 } 1841 return this.bucketAllocator.getUsedSize(); 1842 } 1843 1844 protected String getAlgorithm() { 1845 return algorithm; 1846 } 1847 1848 /** 1849 * Evicts all blocks for a specific HFile. 1850 * <p> 1851 * This is used for evict-on-close to remove all blocks of a specific HFile. 1852 * @return the number of blocks evicted 1853 */ 1854 @Override 1855 public int evictBlocksByHfileName(String hfileName) { 1856 return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); 1857 } 1858 1859 @Override 1860 public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { 1861 fileNotFullyCached(hfileName); 1862 Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); 1863 LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), 1864 hfileName, initOffset, endOffset); 1865 int numEvicted = 0; 1866 for (BlockCacheKey key : keySet) { 1867 if (evictBlock(key)) { 1868 ++numEvicted; 1869 } 1870 } 1871 return numEvicted; 1872 } 1873 1874 private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) { 1875 return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, 1876 new BlockCacheKey(hfileName, end), true); 1877 } 1878 1879 /** 1880 * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each 1881 * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate 1882 * number of elements out of each according to configuration parameters and their relative sizes. 1883 */ 1884 private class BucketEntryGroup { 1885 1886 private CachedEntryQueue queue; 1887 private long totalSize = 0; 1888 private long bucketSize; 1889 1890 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1891 this.bucketSize = bucketSize; 1892 queue = new CachedEntryQueue(bytesToFree, blockSize); 1893 totalSize = 0; 1894 } 1895 1896 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1897 totalSize += block.getValue().getLength(); 1898 queue.add(block); 1899 } 1900 1901 public long free(long toFree) { 1902 Map.Entry<BlockCacheKey, BucketEntry> entry; 1903 long freedBytes = 0; 1904 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1905 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1906 while ((entry = queue.pollLast()) != null) { 1907 BlockCacheKey blockCacheKey = entry.getKey(); 1908 BucketEntry be = entry.getValue(); 1909 if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { 1910 freedBytes += be.getLength(); 1911 } 1912 if (freedBytes >= toFree) { 1913 return freedBytes; 1914 } 1915 } 1916 return freedBytes; 1917 } 1918 1919 public long overflow() { 1920 return totalSize - bucketSize; 1921 } 1922 1923 public long totalSize() { 1924 return totalSize; 1925 } 1926 } 1927 1928 /** 1929 * Block Entry stored in the memory with key,data and so on 1930 */ 1931 static class RAMQueueEntry { 1932 private final BlockCacheKey key; 1933 private final Cacheable data; 1934 private long accessCounter; 1935 private boolean inMemory; 1936 private boolean isCachePersistent; 1937 1938 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1939 boolean isCachePersistent) { 1940 this.key = bck; 1941 this.data = data; 1942 this.accessCounter = accessCounter; 1943 this.inMemory = inMemory; 1944 this.isCachePersistent = isCachePersistent; 1945 } 1946 1947 public Cacheable getData() { 1948 return data; 1949 } 1950 1951 public BlockCacheKey getKey() { 1952 return key; 1953 } 1954 1955 public void access(long accessCounter) { 1956 this.accessCounter = accessCounter; 1957 } 1958 1959 private ByteBuffAllocator getByteBuffAllocator() { 1960 if (data instanceof HFileBlock) { 1961 return ((HFileBlock) data).getByteBuffAllocator(); 1962 } 1963 return ByteBuffAllocator.HEAP; 1964 } 1965 1966 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1967 final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler, 1968 ByteBuffer metaBuff) throws IOException { 1969 int len = data.getSerializedLength(); 1970 // This cacheable thing can't be serialized 1971 if (len == 0) { 1972 return null; 1973 } 1974 if (isCachePersistent && data instanceof HFileBlock) { 1975 len += Long.BYTES; // we need to record the cache time for consistency check in case of 1976 // recovery 1977 } 1978 long offset = alloc.allocateBlock(len); 1979 boolean succ = false; 1980 BucketEntry bucketEntry = null; 1981 try { 1982 int diskSizeWithHeader = (data instanceof HFileBlock) 1983 ? ((HFileBlock) data).getOnDiskSizeWithHeader() 1984 : data.getSerializedLength(); 1985 bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader, accessCounter, inMemory, 1986 createRecycler, getByteBuffAllocator()); 1987 bucketEntry.setDeserializerReference(data.getDeserializer()); 1988 if (data instanceof HFileBlock) { 1989 // If an instance of HFileBlock, save on some allocations. 1990 HFileBlock block = (HFileBlock) data; 1991 ByteBuff sliceBuf = block.getBufferReadOnly(); 1992 block.getMetaData(metaBuff); 1993 // adds the cache time prior to the block and metadata part 1994 if (isCachePersistent) { 1995 ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); 1996 buffer.putLong(bucketEntry.getCachedTime()); 1997 buffer.rewind(); 1998 ioEngine.write(buffer, offset); 1999 ioEngine.write(sliceBuf, (offset + Long.BYTES)); 2000 } else { 2001 ioEngine.write(sliceBuf, offset); 2002 } 2003 ioEngine.write(metaBuff, offset + len - metaBuff.limit()); 2004 } else { 2005 // Only used for testing. 2006 ByteBuffer bb = ByteBuffer.allocate(len); 2007 data.serialize(bb, true); 2008 ioEngine.write(bb, offset); 2009 } 2010 succ = true; 2011 } finally { 2012 if (!succ) { 2013 alloc.freeBlock(offset, len); 2014 } 2015 } 2016 realCacheSize.add(len); 2017 return bucketEntry; 2018 } 2019 } 2020 2021 /** 2022 * Only used in test 2023 */ 2024 void stopWriterThreads() throws InterruptedException { 2025 for (WriterThread writerThread : writerThreads) { 2026 writerThread.disableWriter(); 2027 writerThread.interrupt(); 2028 writerThread.join(); 2029 } 2030 } 2031 2032 @Override 2033 public Iterator<CachedBlock> iterator() { 2034 // Don't bother with ramcache since stuff is in here only a little while. 2035 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = this.backingMap.entrySet().iterator(); 2036 return new Iterator<CachedBlock>() { 2037 private final long now = System.nanoTime(); 2038 2039 @Override 2040 public boolean hasNext() { 2041 return i.hasNext(); 2042 } 2043 2044 @Override 2045 public CachedBlock next() { 2046 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 2047 return new CachedBlock() { 2048 @Override 2049 public String toString() { 2050 return BlockCacheUtil.toString(this, now); 2051 } 2052 2053 @Override 2054 public BlockPriority getBlockPriority() { 2055 return e.getValue().getPriority(); 2056 } 2057 2058 @Override 2059 public BlockType getBlockType() { 2060 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 2061 return null; 2062 } 2063 2064 @Override 2065 public long getOffset() { 2066 return e.getKey().getOffset(); 2067 } 2068 2069 @Override 2070 public long getSize() { 2071 return e.getValue().getLength(); 2072 } 2073 2074 @Override 2075 public long getCachedTime() { 2076 return e.getValue().getCachedTime(); 2077 } 2078 2079 @Override 2080 public String getFilename() { 2081 return e.getKey().getHfileName(); 2082 } 2083 2084 @Override 2085 public int compareTo(CachedBlock other) { 2086 int diff = this.getFilename().compareTo(other.getFilename()); 2087 if (diff != 0) return diff; 2088 2089 diff = Long.compare(this.getOffset(), other.getOffset()); 2090 if (diff != 0) return diff; 2091 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 2092 throw new IllegalStateException( 2093 "" + this.getCachedTime() + ", " + other.getCachedTime()); 2094 } 2095 return Long.compare(other.getCachedTime(), this.getCachedTime()); 2096 } 2097 2098 @Override 2099 public int hashCode() { 2100 return e.getKey().hashCode(); 2101 } 2102 2103 @Override 2104 public boolean equals(Object obj) { 2105 if (obj instanceof CachedBlock) { 2106 CachedBlock cb = (CachedBlock) obj; 2107 return compareTo(cb) == 0; 2108 } else { 2109 return false; 2110 } 2111 } 2112 }; 2113 } 2114 2115 @Override 2116 public void remove() { 2117 throw new UnsupportedOperationException(); 2118 } 2119 }; 2120 } 2121 2122 @Override 2123 public BlockCache[] getBlockCaches() { 2124 return null; 2125 } 2126 2127 public int getRpcRefCount(BlockCacheKey cacheKey) { 2128 BucketEntry bucketEntry = backingMap.get(cacheKey); 2129 if (bucketEntry != null) { 2130 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 2131 } 2132 return 0; 2133 } 2134 2135 float getAcceptableFactor() { 2136 return acceptableFactor; 2137 } 2138 2139 float getMinFactor() { 2140 return minFactor; 2141 } 2142 2143 float getExtraFreeFactor() { 2144 return extraFreeFactor; 2145 } 2146 2147 float getSingleFactor() { 2148 return singleFactor; 2149 } 2150 2151 float getMultiFactor() { 2152 return multiFactor; 2153 } 2154 2155 float getMemoryFactor() { 2156 return memoryFactor; 2157 } 2158 2159 public String getPersistencePath() { 2160 return persistencePath; 2161 } 2162 2163 /** 2164 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 2165 */ 2166 static class RAMCache { 2167 /** 2168 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 2169 * {@link RAMCache#get(BlockCacheKey)} and 2170 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to guarantee 2171 * the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the 2172 * func method can execute exactly once only when the key is present(or absent) and under the 2173 * lock context. Otherwise, the reference count of block will be messed up. Notice that the 2174 * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 2175 */ 2176 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 2177 2178 public boolean containsKey(BlockCacheKey key) { 2179 return delegate.containsKey(key); 2180 } 2181 2182 public RAMQueueEntry get(BlockCacheKey key) { 2183 return delegate.computeIfPresent(key, (k, re) -> { 2184 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 2185 // atomic, another thread may remove and release the block, when retaining in this thread we 2186 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 2187 re.getData().retain(); 2188 return re; 2189 }); 2190 } 2191 2192 /** 2193 * Return the previous associated value, or null if absent. It has the same meaning as 2194 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 2195 */ 2196 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 2197 AtomicBoolean absent = new AtomicBoolean(false); 2198 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 2199 // The RAMCache reference to this entry, so reference count should be increment. 2200 entry.getData().retain(); 2201 absent.set(true); 2202 return entry; 2203 }); 2204 return absent.get() ? null : re; 2205 } 2206 2207 public boolean remove(BlockCacheKey key) { 2208 return remove(key, re -> { 2209 }); 2210 } 2211 2212 /** 2213 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 2214 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 2215 * exception. the consumer will access entry to remove before release its reference count. 2216 * Notice, don't change its reference count in the {@link Consumer} 2217 */ 2218 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 2219 RAMQueueEntry previous = delegate.remove(key); 2220 action.accept(previous); 2221 if (previous != null) { 2222 previous.getData().release(); 2223 } 2224 return previous != null; 2225 } 2226 2227 public boolean isEmpty() { 2228 return delegate.isEmpty(); 2229 } 2230 2231 public void clear() { 2232 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 2233 while (it.hasNext()) { 2234 RAMQueueEntry re = it.next().getValue(); 2235 it.remove(); 2236 re.getData().release(); 2237 } 2238 } 2239 2240 public boolean hasBlocksForFile(String fileName) { 2241 return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) 2242 .findFirst().isPresent(); 2243 } 2244 } 2245 2246 public Map<BlockCacheKey, BucketEntry> getBackingMap() { 2247 return backingMap; 2248 } 2249 2250 public AtomicBoolean getBackingMapValidated() { 2251 return backingMapValidated; 2252 } 2253 2254 @Override 2255 public Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() { 2256 return Optional.of(fullyCachedFiles); 2257 } 2258 2259 public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) { 2260 if (cacheConf.getBlockCache().isPresent()) { 2261 BlockCache bc = cacheConf.getBlockCache().get(); 2262 if (bc instanceof CombinedBlockCache) { 2263 BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache(); 2264 if (l2 instanceof BucketCache) { 2265 return Optional.of((BucketCache) l2); 2266 } 2267 } else if (bc instanceof BucketCache) { 2268 return Optional.of((BucketCache) bc); 2269 } 2270 } 2271 return Optional.empty(); 2272 } 2273 2274 @Override 2275 public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, 2276 long size) { 2277 // block eviction may be happening in the background as prefetch runs, 2278 // so we need to count all blocks for this file in the backing map under 2279 // a read lock for the block offset 2280 final List<ReentrantReadWriteLock> locks = new ArrayList<>(); 2281 LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}", 2282 fileName, totalBlockCount, dataBlockCount); 2283 try { 2284 final MutableInt count = new MutableInt(); 2285 LOG.debug("iterating over {} entries in the backing map", backingMap.size()); 2286 Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); 2287 if (result.isEmpty() && StoreFileInfo.isReference(fileName)) { 2288 result = getAllCacheKeysForFile( 2289 StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0, 2290 Long.MAX_VALUE); 2291 } 2292 result.stream().forEach(entry -> { 2293 LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", 2294 fileName.getName(), entry.getOffset()); 2295 ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); 2296 lock.readLock().lock(); 2297 locks.add(lock); 2298 if (backingMap.containsKey(entry) && entry.getBlockType() == BlockType.DATA) { 2299 count.increment(); 2300 } 2301 }); 2302 int metaCount = totalBlockCount - dataBlockCount; 2303 // BucketCache would only have data blocks 2304 if (dataBlockCount == count.getValue()) { 2305 LOG.debug("File {} has now been fully cached.", fileName); 2306 fileCacheCompleted(fileName, size); 2307 } else { 2308 LOG.debug( 2309 "Prefetch executor completed for {}, but only {} data blocks were cached. " 2310 + "Total data blocks for file: {}. " 2311 + "Checking for blocks pending cache in cache writer queue.", 2312 fileName, count.getValue(), dataBlockCount); 2313 if (ramCache.hasBlocksForFile(fileName.getName())) { 2314 for (ReentrantReadWriteLock lock : locks) { 2315 lock.readLock().unlock(); 2316 } 2317 locks.clear(); 2318 LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " 2319 + "and try the verification again.", fileName); 2320 Thread.sleep(100); 2321 notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); 2322 } else if ( 2323 (getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE).size() - metaCount) 2324 == dataBlockCount 2325 ) { 2326 LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " 2327 + "the cache write queue but we now found that total cached blocks for file {} " 2328 + "is equal to data block count.", count, dataBlockCount, fileName.getName()); 2329 fileCacheCompleted(fileName, size); 2330 } else { 2331 LOG.info("We found only {} data blocks cached from a total of {} for file {}, " 2332 + "but no blocks pending caching. Maybe cache is full or evictions " 2333 + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName); 2334 } 2335 } 2336 } catch (InterruptedException e) { 2337 throw new RuntimeException(e); 2338 } finally { 2339 for (ReentrantReadWriteLock lock : locks) { 2340 lock.readLock().unlock(); 2341 } 2342 } 2343 } 2344 2345 @Override 2346 public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) { 2347 if (!isCacheInitialized("blockFitsIntoTheCache")) { 2348 return Optional.of(false); 2349 } 2350 2351 long currentUsed = bucketAllocator.getUsedSize(); 2352 boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); 2353 return Optional.of(result); 2354 } 2355 2356 @Override 2357 public Optional<Boolean> shouldCacheFile(String fileName) { 2358 // if we don't have the file in fullyCachedFiles, we should cache it 2359 return Optional.of(!fullyCachedFiles.containsKey(fileName)); 2360 } 2361 2362 @Override 2363 public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { 2364 boolean foundKey = backingMap.containsKey(key); 2365 // if there's no entry for the key itself, we need to check if this key is for a reference, 2366 // and if so, look for a block from the referenced file using this getBlockForReference method. 2367 return Optional.of(foundKey ? true : getBlockForReference(key) != null); 2368 } 2369 2370 @Override 2371 public Optional<Integer> getBlockSize(BlockCacheKey key) { 2372 BucketEntry entry = backingMap.get(key); 2373 if (entry == null) { 2374 // the key might be for a reference tha we had found the block from the referenced file in 2375 // the cache when we first tried to cache it. 2376 entry = getBlockForReference(key); 2377 return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); 2378 } else { 2379 return Optional.of(entry.getOnDiskSizeWithHeader()); 2380 } 2381 2382 } 2383 2384 boolean isCacheInitialized(String api) { 2385 if (cacheState == CacheState.INITIALIZING) { 2386 LOG.warn("Bucket initialisation pending at {}", api); 2387 return false; 2388 } 2389 return true; 2390 } 2391 2392 @Override 2393 public boolean waitForCacheInitialization(long timeout) { 2394 try { 2395 while (cacheState == CacheState.INITIALIZING) { 2396 if (timeout <= 0) { 2397 break; 2398 } 2399 Thread.sleep(100); 2400 timeout -= 100; 2401 } 2402 } finally { 2403 return isCacheEnabled(); 2404 } 2405 } 2406}