001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.util.Map;
022import java.util.concurrent.ConcurrentSkipListMap;
023import java.util.concurrent.Future;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.ScheduledFuture;
027import java.util.concurrent.ScheduledThreadPoolExecutor;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.regex.Pattern;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.trace.TraceUtil;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@InterfaceAudience.Private
044public final class PrefetchExecutor {
045
046  private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
047  /** Wait time in miliseconds before executing prefetch */
048  public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
049  public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation";
050  public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f;
051
052  /** Futures for tracking block prefetch activity */
053  private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
054  /** Runnables for resetting the prefetch activity */
055  private static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>();
056  /** Executor pool shared among all HFiles for block prefetch */
057  private static final ScheduledExecutorService prefetchExecutorPool;
058  /** Delay before beginning prefetch */
059  private static int prefetchDelayMillis;
060  /** Variation in prefetch delay times, to mitigate stampedes */
061  private static float prefetchDelayVariation;
062  static {
063    // Consider doing this on demand with a configuration passed in rather
064    // than in a static initializer.
065    Configuration conf = HBaseConfiguration.create();
066    // 1s here for tests, consider 30s in hbase-default.xml
067    // Set to 0 for no delay
068    prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
069    prefetchDelayVariation =
070      conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
071    int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
072    prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
073      @Override
074      public Thread newThread(Runnable r) {
075        String name = "hfile-prefetch-" + EnvironmentEdgeManager.currentTime();
076        Thread t = new Thread(r, name);
077        t.setDaemon(true);
078        return t;
079      }
080    });
081  }
082
083  // TODO: We want HFile, which is where the blockcache lives, to handle
084  // prefetching of file blocks but the Store level is where path convention
085  // knowledge should be contained
086  private static final Pattern prefetchPathExclude =
087    Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.")
088      + Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR
089      + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");
090
091  public static void request(Path path, Runnable runnable) {
092    if (!prefetchPathExclude.matcher(path.toString()).find()) {
093      long delay;
094      if (prefetchDelayMillis > 0) {
095        delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2)))
096          + (prefetchDelayMillis * (prefetchDelayVariation / 2)
097            * ThreadLocalRandom.current().nextFloat()));
098      } else {
099        delay = 0;
100      }
101      try {
102        LOG.debug("Prefetch requested for {}, delay={} ms", path, delay);
103        final Runnable tracedRunnable =
104          TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request");
105        final Future<?> future =
106          prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS);
107        prefetchFutures.put(path, future);
108        prefetchRunnable.put(path, runnable);
109      } catch (RejectedExecutionException e) {
110        prefetchFutures.remove(path);
111        prefetchRunnable.remove(path);
112        LOG.warn("Prefetch request rejected for {}", path);
113      }
114    }
115  }
116
117  public static void complete(Path path) {
118    prefetchFutures.remove(path);
119    prefetchRunnable.remove(path);
120    if (LOG.isDebugEnabled()) {
121      LOG.debug("Prefetch completed for {}", path.getName());
122    }
123  }
124
125  public static void cancel(Path path) {
126    Future<?> future = prefetchFutures.get(path);
127    if (future != null) {
128      // ok to race with other cancellation attempts
129      future.cancel(true);
130      prefetchFutures.remove(path);
131      prefetchRunnable.remove(path);
132      LOG.debug("Prefetch cancelled for {}", path);
133    }
134  }
135
136  private PrefetchExecutor() {
137  }
138
139  public static boolean isCompleted(Path path) {
140    Future<?> future = prefetchFutures.get(path);
141    if (future != null) {
142      return future.isDone();
143    }
144    return true;
145  }
146
147  /* Visible for testing only */
148  @RestrictedApi(explanation = "Should only be called in tests", link = "",
149      allowedOnPath = ".*/src/test/.*")
150  static ScheduledExecutorService getExecutorPool() {
151    return prefetchExecutorPool;
152  }
153
154  @RestrictedApi(explanation = "Should only be called in tests", link = "",
155      allowedOnPath = ".*/src/test/.*")
156  static Map<Path, Future<?>> getPrefetchFutures() {
157    return prefetchFutures;
158  }
159
160  @RestrictedApi(explanation = "Should only be called in tests", link = "",
161      allowedOnPath = ".*/src/test/.*")
162  static Map<Path, Runnable> getPrefetchRunnable() {
163    return prefetchRunnable;
164  }
165
166  static boolean isPrefetchStarted() {
167    AtomicBoolean prefetchStarted = new AtomicBoolean(false);
168    for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
169      Path k = entry.getKey();
170      Future<?> v = entry.getValue();
171      ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
172      long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
173      if (waitTime < 0) {
174        // At this point prefetch is started
175        prefetchStarted.set(true);
176        break;
177      }
178    }
179    return prefetchStarted.get();
180  }
181
182  public static int getPrefetchDelay() {
183    return prefetchDelayMillis;
184  }
185
186  public static void loadConfiguration(Configuration conf) {
187    prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
188    prefetchDelayVariation =
189      conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
190    prefetchFutures.forEach((k, v) -> {
191      ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
192      if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) {
193        Runnable runnable = prefetchRunnable.get(k);
194        Future<?> future = prefetchFutures.get(k);
195        if (future != null) {
196          prefetchFutures.remove(k);
197          // ok to race with other cancellation attempts
198          boolean canceled = future.cancel(true);
199          LOG.debug("Prefetch {} for {}",
200            canceled ? "cancelled" : "cancel attempted but it was already finished", k);
201        }
202        if (runnable != null && future != null && future.isCancelled()) {
203          request(k, runnable);
204        }
205      }
206      LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k,
207        prefetchDelayMillis, prefetchDelayVariation);
208    });
209  }
210}