001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.context.Scope; 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.nio.ByteBuffer; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.NoSuchElementException; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import net.spy.memcached.CachedData; 035import net.spy.memcached.ConnectionFactoryBuilder; 036import net.spy.memcached.FailureMode; 037import net.spy.memcached.MemcachedClient; 038import net.spy.memcached.OperationTimeoutException; 039import net.spy.memcached.transcoders.Transcoder; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.io.ByteBuffAllocator; 042import org.apache.hadoop.hbase.nio.ByteBuff; 043import org.apache.hadoop.hbase.nio.SingleByteBuff; 044import org.apache.hadoop.hbase.trace.TraceUtil; 045import org.apache.hadoop.hbase.util.Addressing; 046import org.apache.hadoop.util.StringUtils; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 052 053/** 054 * Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons 055 * that are tuned well and have a good network connection to the HBase regionservers. Any other use 056 * will likely slow down HBase greatly. 057 */ 058@InterfaceAudience.Private 059@SuppressWarnings("FutureReturnValueIgnored") 060public class MemcachedBlockCache implements BlockCache { 061 private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName()); 062 063 // Some memcache versions won't take more than 1024 * 1024. So set the limit below 064 // that just in case this client is used with those versions. 065 public static final int MAX_SIZE = 1020 * 1024; 066 067 // Start memcached with -I <MAX_SIZE> to ensure it has the ability to store blocks of this size 068 public static final int MAX_TIME = 60 * 60 * 24 * 30; // 30 days, max allowed per the memcached 069 // spec 070 071 // Config key for what memcached servers to use. 072 // They should be specified in a comma sperated list with ports. 073 // like: 074 // 075 // host1:11211,host3:8080,host4:11211 076 public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers"; 077 public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout"; 078 public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout"; 079 public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze"; 080 public static final long MEMCACHED_DEFAULT_TIMEOUT = 500; 081 public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false; 082 public static final int STAT_THREAD_PERIOD = 60 * 5; 083 084 private final MemcachedClient client; 085 private final HFileBlockTranscoder tc = new HFileBlockTranscoder(); 086 private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache"); 087 private final AtomicLong cachedCount = new AtomicLong(); 088 private final AtomicLong notCachedCount = new AtomicLong(); 089 private final AtomicLong cacheErrorCount = new AtomicLong(); 090 private final AtomicLong timeoutCount = new AtomicLong(); 091 092 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 093 private transient final ScheduledExecutorService scheduleThreadPool = 094 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 095 .setNameFormat("MemcachedBlockCacheStatsExecutor").setDaemon(true).build()); 096 097 public MemcachedBlockCache(Configuration c) throws IOException { 098 LOG.info("Creating MemcachedBlockCache"); 099 100 long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT); 101 long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT); 102 boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT); 103 104 ConnectionFactoryBuilder builder = 105 new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout) 106 .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true) 107 .setUseNagleAlgorithm(false).setReadBufferSize(MAX_SIZE); 108 109 // Assume only the localhost is serving memcached. 110 // A la mcrouter or co-locating memcached with split regionservers. 111 // 112 // If this config is a pool of memcached servers they will all be used according to the 113 // default hashing scheme defined by the memcached client. Spy Memecache client in this 114 // case. 115 String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211"); 116 String[] servers = serverListString.split(","); 117 // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any 118 // resolved identities cannot have their address mappings changed while the MemcachedClient 119 // instance is alive. We won't get a chance to trigger re-resolution. 120 List<InetSocketAddress> serverAddresses = new ArrayList<>(servers.length); 121 for (String s : servers) { 122 serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); 123 } 124 125 client = new MemcachedClient(builder.build(), serverAddresses); 126 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 127 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 128 } 129 130 @Override 131 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 132 cacheBlock(cacheKey, buf); 133 } 134 135 @Override 136 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 137 if (buf instanceof HFileBlock) { 138 if (buf.getSerializedLength() > MAX_SIZE) { 139 LOG.debug("Block of type {} with key {} is too large, size={}, max={}, will not cache", 140 buf.getClass(), cacheKey, buf.getSerializedLength(), MAX_SIZE); 141 notCachedCount.incrementAndGet(); 142 return; 143 } 144 client.set(cacheKey.toString(), MAX_TIME, (HFileBlock) buf, tc).addListener(f -> { 145 try { 146 f.get(); 147 cachedCount.incrementAndGet(); 148 } catch (Exception e) { 149 LOG.warn("Failed to cache block with key " + cacheKey, e); 150 cacheErrorCount.incrementAndGet(); 151 } 152 }); 153 } else { 154 LOG.debug("Can not cache Cacheables of type {} with key {}", buf.getClass(), cacheKey); 155 notCachedCount.incrementAndGet(); 156 } 157 } 158 159 @Override 160 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 161 boolean updateCacheMetrics) { 162 // Assume that nothing is the block cache 163 HFileBlock result = null; 164 Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan(); 165 try (Scope traceScope = span.makeCurrent()) { 166 result = client.get(cacheKey.toString(), tc); 167 } catch (Exception e) { 168 // Catch a pretty broad set of exceptions to limit any changes in the memcache client 169 // and how it handles failures from leaking into the read path. 170 if ( 171 (e instanceof OperationTimeoutException) || ((e instanceof RuntimeException) 172 && (e.getCause() instanceof OperationTimeoutException)) 173 ) { 174 timeoutCount.incrementAndGet(); 175 if (LOG.isDebugEnabled()) { 176 LOG.debug("Timeout getting key " + cacheKey.toString(), e); 177 } 178 } else { 179 cacheErrorCount.incrementAndGet(); 180 if (LOG.isDebugEnabled()) { 181 LOG.debug("Exception getting key " + cacheKey.toString(), e); 182 } 183 } 184 result = null; 185 } finally { 186 span.end(); 187 if (updateCacheMetrics) { 188 if (result == null) { 189 cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 190 } else { 191 cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 192 } 193 } 194 } 195 return result; 196 } 197 198 @Override 199 public boolean evictBlock(BlockCacheKey cacheKey) { 200 try { 201 cacheStats.evict(); 202 return client.delete(cacheKey.toString()).get(); 203 } catch (InterruptedException e) { 204 LOG.warn("Error deleting " + cacheKey.toString(), e); 205 Thread.currentThread().interrupt(); 206 } catch (ExecutionException e) { 207 if (LOG.isDebugEnabled()) { 208 LOG.debug("Error deleting " + cacheKey.toString(), e); 209 } 210 } 211 return false; 212 } 213 214 /** 215 * This method does nothing so that memcached can handle all evictions. 216 */ 217 @Override 218 public int evictBlocksByHfileName(String hfileName) { 219 return 0; 220 } 221 222 @Override 223 public CacheStats getStats() { 224 return cacheStats; 225 } 226 227 @Override 228 public void shutdown() { 229 client.shutdown(); 230 this.scheduleThreadPool.shutdown(); 231 for (int i = 0; i < 10; i++) { 232 if (!this.scheduleThreadPool.isShutdown()) { 233 try { 234 Thread.sleep(10); 235 } catch (InterruptedException e) { 236 LOG.warn("Interrupted while sleeping"); 237 Thread.currentThread().interrupt(); 238 break; 239 } 240 } 241 } 242 if (!this.scheduleThreadPool.isShutdown()) { 243 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 244 LOG.debug("Still running " + runnables); 245 } 246 } 247 248 @Override 249 public long size() { 250 return 0; 251 } 252 253 @Override 254 public long getMaxSize() { 255 return 0; 256 } 257 258 @Override 259 public long getFreeSize() { 260 return 0; 261 } 262 263 @Override 264 public long getCurrentSize() { 265 return 0; 266 } 267 268 @Override 269 public long getCurrentDataSize() { 270 return 0; 271 } 272 273 @Override 274 public long getBlockCount() { 275 return 0; 276 } 277 278 @Override 279 public long getDataBlockCount() { 280 return 0; 281 } 282 283 @Override 284 public Iterator<CachedBlock> iterator() { 285 return new Iterator<CachedBlock>() { 286 @Override 287 public boolean hasNext() { 288 return false; 289 } 290 291 @Override 292 public CachedBlock next() { 293 throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks."); 294 } 295 296 @Override 297 public void remove() { 298 299 } 300 }; 301 } 302 303 @Override 304 public BlockCache[] getBlockCaches() { 305 return null; 306 } 307 308 /** 309 * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays. 310 */ 311 private static class HFileBlockTranscoder implements Transcoder<HFileBlock> { 312 313 @Override 314 public boolean asyncDecode(CachedData d) { 315 return false; 316 } 317 318 @Override 319 public CachedData encode(HFileBlock block) { 320 ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength()); 321 block.serialize(bb, true); 322 return new CachedData(0, bb.array(), CachedData.MAX_SIZE); 323 } 324 325 @Override 326 public HFileBlock decode(CachedData d) { 327 try { 328 ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData())); 329 return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP); 330 } catch (IOException e) { 331 LOG.warn("Failed to deserialize data from memcached", e); 332 } 333 return null; 334 } 335 336 @Override 337 public int getMaxSize() { 338 return MAX_SIZE; 339 } 340 } 341 342 private static class StatisticsThread extends Thread { 343 344 private final MemcachedBlockCache c; 345 346 public StatisticsThread(MemcachedBlockCache c) { 347 super("MemcachedBlockCacheStats"); 348 setDaemon(true); 349 this.c = c; 350 } 351 352 @Override 353 public void run() { 354 c.logStats(); 355 } 356 357 } 358 359 public void logStats() { 360 LOG.info("cached=" + cachedCount.get() + ", notCached=" + notCachedCount.get() 361 + ", cacheErrors=" + cacheErrorCount.get() + ", timeouts=" + timeoutCount.get() + ", reads=" 362 + cacheStats.getRequestCount() + ", " + "hits=" + cacheStats.getHitCount() + ", hitRatio=" 363 + (cacheStats.getHitCount() == 0 364 ? "0" 365 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 366 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 367 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 368 + (cacheStats.getHitCachingCount() == 0 369 ? "0," 370 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 371 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 372 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()); 373 } 374 375}