001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.Map; 022import java.util.concurrent.ConcurrentSkipListMap; 023import java.util.concurrent.Future; 024import java.util.concurrent.RejectedExecutionException; 025import java.util.concurrent.ScheduledExecutorService; 026import java.util.concurrent.ScheduledFuture; 027import java.util.concurrent.ScheduledThreadPoolExecutor; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.regex.Pattern; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.trace.TraceUtil; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043@InterfaceAudience.Private 044public final class PrefetchExecutor { 045 046 private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class); 047 /** Wait time in miliseconds before executing prefetch */ 048 public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay"; 049 public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation"; 050 public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f; 051 052 /** Futures for tracking block prefetch activity */ 053 private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); 054 /** Runnables for resetting the prefetch activity */ 055 private static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>(); 056 /** Executor pool shared among all HFiles for block prefetch */ 057 private static final ScheduledExecutorService prefetchExecutorPool; 058 /** Delay before beginning prefetch */ 059 private static int prefetchDelayMillis; 060 /** Variation in prefetch delay times, to mitigate stampedes */ 061 private static float prefetchDelayVariation; 062 static { 063 // Consider doing this on demand with a configuration passed in rather 064 // than in a static initializer. 065 Configuration conf = HBaseConfiguration.create(); 066 // 1s here for tests, consider 30s in hbase-default.xml 067 // Set to 0 for no delay 068 prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000); 069 prefetchDelayVariation = 070 conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 071 int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); 072 prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() { 073 @Override 074 public Thread newThread(Runnable r) { 075 String name = "hfile-prefetch-" + EnvironmentEdgeManager.currentTime(); 076 Thread t = new Thread(r, name); 077 t.setDaemon(true); 078 return t; 079 } 080 }); 081 } 082 083 // TODO: We want HFile, which is where the blockcache lives, to handle 084 // prefetching of file blocks but the Store level is where path convention 085 // knowledge should be contained 086 private static final Pattern prefetchPathExclude = 087 Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") 088 + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR 089 + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); 090 091 public static void request(Path path, Runnable runnable) { 092 if (!prefetchPathExclude.matcher(path.toString()).find()) { 093 long delay; 094 if (prefetchDelayMillis > 0) { 095 delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2))) 096 + (prefetchDelayMillis * (prefetchDelayVariation / 2) 097 * ThreadLocalRandom.current().nextFloat())); 098 } else { 099 delay = 0; 100 } 101 try { 102 LOG.debug("Prefetch requested for {}, delay={} ms", path, delay); 103 final Runnable tracedRunnable = 104 TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request"); 105 final Future<?> future = 106 prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS); 107 prefetchFutures.put(path, future); 108 prefetchRunnable.put(path, runnable); 109 } catch (RejectedExecutionException e) { 110 prefetchFutures.remove(path); 111 prefetchRunnable.remove(path); 112 LOG.warn("Prefetch request rejected for {}", path); 113 } 114 } 115 } 116 117 public static void complete(Path path) { 118 prefetchFutures.remove(path); 119 prefetchRunnable.remove(path); 120 if (LOG.isDebugEnabled()) { 121 LOG.debug("Prefetch completed for {}", path.getName()); 122 } 123 } 124 125 public static void cancel(Path path) { 126 Future<?> future = prefetchFutures.get(path); 127 if (future != null) { 128 // ok to race with other cancellation attempts 129 future.cancel(true); 130 prefetchFutures.remove(path); 131 prefetchRunnable.remove(path); 132 LOG.debug("Prefetch cancelled for {}", path); 133 } 134 } 135 136 public static void interrupt(Path path) { 137 Future<?> future = prefetchFutures.get(path); 138 if (future != null) { 139 prefetchFutures.remove(path); 140 // ok to race with other cancellation attempts 141 future.cancel(true); 142 LOG.debug("Prefetch cancelled for {}", path); 143 } 144 } 145 146 private PrefetchExecutor() { 147 } 148 149 public static boolean isCompleted(Path path) { 150 Future<?> future = prefetchFutures.get(path); 151 if (future != null) { 152 return future.isDone(); 153 } 154 return true; 155 } 156 157 /* Visible for testing only */ 158 @RestrictedApi(explanation = "Should only be called in tests", link = "", 159 allowedOnPath = ".*/src/test/.*") 160 static ScheduledExecutorService getExecutorPool() { 161 return prefetchExecutorPool; 162 } 163 164 @RestrictedApi(explanation = "Should only be called in tests", link = "", 165 allowedOnPath = ".*/src/test/.*") 166 static Map<Path, Future<?>> getPrefetchFutures() { 167 return prefetchFutures; 168 } 169 170 @RestrictedApi(explanation = "Should only be called in tests", link = "", 171 allowedOnPath = ".*/src/test/.*") 172 static Map<Path, Runnable> getPrefetchRunnable() { 173 return prefetchRunnable; 174 } 175 176 static boolean isPrefetchStarted() { 177 AtomicBoolean prefetchStarted = new AtomicBoolean(false); 178 for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) { 179 Path k = entry.getKey(); 180 Future<?> v = entry.getValue(); 181 ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k); 182 long waitTime = sf.getDelay(TimeUnit.MILLISECONDS); 183 if (waitTime < 0) { 184 // At this point prefetch is started 185 prefetchStarted.set(true); 186 break; 187 } 188 } 189 return prefetchStarted.get(); 190 } 191 192 public static int getPrefetchDelay() { 193 return prefetchDelayMillis; 194 } 195 196 public static void loadConfiguration(Configuration conf) { 197 prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000); 198 prefetchDelayVariation = 199 conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 200 prefetchFutures.forEach((k, v) -> { 201 ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k); 202 if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) { 203 // the thread is still pending delay expiration and has not started to run yet, so can be 204 // re-scheduled at no cost. 205 interrupt(k); 206 request(k, prefetchRunnable.get(k)); 207 } 208 LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k, 209 prefetchDelayMillis, prefetchDelayVariation); 210 }); 211 } 212}