001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.Map; 022import java.util.concurrent.ConcurrentSkipListMap; 023import java.util.concurrent.Future; 024import java.util.concurrent.RejectedExecutionException; 025import java.util.concurrent.ScheduledExecutorService; 026import java.util.concurrent.ScheduledFuture; 027import java.util.concurrent.ScheduledThreadPoolExecutor; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.regex.Pattern; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.trace.TraceUtil; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043@InterfaceAudience.Private 044public final class PrefetchExecutor { 045 046 private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class); 047 /** Wait time in miliseconds before executing prefetch */ 048 public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay"; 049 public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation"; 050 public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f; 051 052 /** Futures for tracking block prefetch activity */ 053 private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>(); 054 /** Runnables for resetting the prefetch activity */ 055 private static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>(); 056 /** Executor pool shared among all HFiles for block prefetch */ 057 private static final ScheduledExecutorService prefetchExecutorPool; 058 /** Delay before beginning prefetch */ 059 private static int prefetchDelayMillis; 060 /** Variation in prefetch delay times, to mitigate stampedes */ 061 private static float prefetchDelayVariation; 062 static { 063 // Consider doing this on demand with a configuration passed in rather 064 // than in a static initializer. 065 Configuration conf = HBaseConfiguration.create(); 066 // 1s here for tests, consider 30s in hbase-default.xml 067 // Set to 0 for no delay 068 prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000); 069 prefetchDelayVariation = 070 conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 071 int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); 072 prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() { 073 @Override 074 public Thread newThread(Runnable r) { 075 String name = "hfile-prefetch-" + EnvironmentEdgeManager.currentTime(); 076 Thread t = new Thread(r, name); 077 t.setDaemon(true); 078 return t; 079 } 080 }); 081 } 082 083 // TODO: We want HFile, which is where the blockcache lives, to handle 084 // prefetching of file blocks but the Store level is where path convention 085 // knowledge should be contained 086 private static final Pattern prefetchPathExclude = 087 Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") 088 + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR 089 + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")"); 090 091 public static void request(Path path, Runnable runnable) { 092 if (!prefetchPathExclude.matcher(path.toString()).find()) { 093 long delay; 094 if (prefetchDelayMillis > 0) { 095 delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2))) 096 + (prefetchDelayMillis * (prefetchDelayVariation / 2) 097 * ThreadLocalRandom.current().nextFloat())); 098 } else { 099 delay = 0; 100 } 101 try { 102 LOG.debug("Prefetch requested for {}, delay={} ms", path, delay); 103 final Runnable tracedRunnable = 104 TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request"); 105 final Future<?> future = 106 prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS); 107 prefetchFutures.put(path, future); 108 prefetchRunnable.put(path, runnable); 109 } catch (RejectedExecutionException e) { 110 prefetchFutures.remove(path); 111 prefetchRunnable.remove(path); 112 LOG.warn("Prefetch request rejected for {}", path); 113 } 114 } 115 } 116 117 public static void complete(Path path) { 118 prefetchFutures.remove(path); 119 prefetchRunnable.remove(path); 120 if (LOG.isDebugEnabled()) { 121 LOG.debug("Prefetch completed for {}", path.getName()); 122 } 123 } 124 125 public static void cancel(Path path) { 126 Future<?> future = prefetchFutures.get(path); 127 if (future != null) { 128 // ok to race with other cancellation attempts 129 future.cancel(true); 130 prefetchFutures.remove(path); 131 prefetchRunnable.remove(path); 132 LOG.debug("Prefetch cancelled for {}", path); 133 } 134 } 135 136 private PrefetchExecutor() { 137 } 138 139 public static boolean isCompleted(Path path) { 140 Future<?> future = prefetchFutures.get(path); 141 if (future != null) { 142 return future.isDone(); 143 } 144 return true; 145 } 146 147 /* Visible for testing only */ 148 @RestrictedApi(explanation = "Should only be called in tests", link = "", 149 allowedOnPath = ".*/src/test/.*") 150 static ScheduledExecutorService getExecutorPool() { 151 return prefetchExecutorPool; 152 } 153 154 @RestrictedApi(explanation = "Should only be called in tests", link = "", 155 allowedOnPath = ".*/src/test/.*") 156 static Map<Path, Future<?>> getPrefetchFutures() { 157 return prefetchFutures; 158 } 159 160 @RestrictedApi(explanation = "Should only be called in tests", link = "", 161 allowedOnPath = ".*/src/test/.*") 162 static Map<Path, Runnable> getPrefetchRunnable() { 163 return prefetchRunnable; 164 } 165 166 static boolean isPrefetchStarted() { 167 AtomicBoolean prefetchStarted = new AtomicBoolean(false); 168 for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) { 169 Path k = entry.getKey(); 170 Future<?> v = entry.getValue(); 171 ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k); 172 long waitTime = sf.getDelay(TimeUnit.MILLISECONDS); 173 if (waitTime < 0) { 174 // At this point prefetch is started 175 prefetchStarted.set(true); 176 break; 177 } 178 } 179 return prefetchStarted.get(); 180 } 181 182 public static int getPrefetchDelay() { 183 return prefetchDelayMillis; 184 } 185 186 public static void loadConfiguration(Configuration conf) { 187 prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000); 188 prefetchDelayVariation = 189 conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); 190 prefetchFutures.forEach((k, v) -> { 191 ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k); 192 if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) { 193 Runnable runnable = prefetchRunnable.get(k); 194 Future<?> future = prefetchFutures.get(k); 195 if (future != null) { 196 prefetchFutures.remove(k); 197 // ok to race with other cancellation attempts 198 boolean canceled = future.cancel(true); 199 LOG.debug("Prefetch {} for {}", 200 canceled ? "cancelled" : "cancel attempted but it was already finished", k); 201 } 202 if (runnable != null && future != null && future.isCancelled()) { 203 request(k, runnable); 204 } 205 } 206 LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k, 207 prefetchDelayMillis, prefetchDelayVariation); 208 }); 209 } 210}