001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static java.util.stream.Collectors.toList;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.List;
025import java.util.concurrent.CompletableFuture;
026import java.util.concurrent.CompletionException;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Executor;
029import java.util.concurrent.Future;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.TimeoutException;
032import java.util.function.BiConsumer;
033import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Helper class for processing futures.
040 */
041@InterfaceAudience.Private
042public final class FutureUtils {
043
044  private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
045
046  private FutureUtils() {
047  }
048
049  /**
050   * This is method is used when you just want to add a listener to the given future. We will call
051   * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
052   * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
053   * suppress exceptions thrown from the code that completes the future, and this method will catch
054   * all the exception thrown from the {@code action} to catch possible code bugs.
055   * <p/>
056   * And the error phone check will always report FutureReturnValueIgnored because every method in
057   * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
058   * have one future that has not been checked. So we introduce this method and add a suppress
059   * warnings annotation here.
060   */
061  @SuppressWarnings("FutureReturnValueIgnored")
062  public static <T> void addListener(CompletableFuture<T> future,
063    BiConsumer<? super T, ? super Throwable> action) {
064    future.whenComplete((resp, error) -> {
065      try {
066        // See this post on stack overflow(shorten since the url is too long),
067        // https://s.apache.org/completionexception
068        // For a chain of CompleableFuture, only the first child CompletableFuture can get the
069        // original exception, others will get a CompletionException, which wraps the original
070        // exception. So here we unwrap it before passing it to the callback action.
071        action.accept(resp, unwrapCompletionException(error));
072      } catch (Throwable t) {
073        LOG.error("Unexpected error caught when processing CompletableFuture", t);
074      }
075    });
076  }
077
078  /**
079   * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
080   * exception is that we will call
081   * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
082   * @see #addListener(CompletableFuture, BiConsumer)
083   */
084  @SuppressWarnings("FutureReturnValueIgnored")
085  public static <T> void addListener(CompletableFuture<T> future,
086    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
087    future.whenCompleteAsync((resp, error) -> {
088      try {
089        action.accept(resp, unwrapCompletionException(error));
090      } catch (Throwable t) {
091        LOG.error("Unexpected error caught when processing CompletableFuture", t);
092      }
093    }, executor);
094  }
095
096  /**
097   * Log the error if the future indicates any failure.
098   */
099  public static void consume(CompletableFuture<?> future) {
100    addListener(future, (r, e) -> {
101      if (e != null) {
102        LOG.warn("Async operation fails", e);
103      }
104    });
105  }
106
107  /**
108   * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
109   * the callbacks in the given {@code executor}.
110   */
111  public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
112    Executor executor) {
113    CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
114    addListener(future, (r, e) -> {
115      if (e != null) {
116        wrappedFuture.completeExceptionally(e);
117      } else {
118        wrappedFuture.complete(r);
119      }
120    }, executor);
121    return wrappedFuture;
122  }
123
124  /**
125   * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
126   */
127  public static Throwable unwrapCompletionException(Throwable error) {
128    if (error instanceof CompletionException) {
129      Throwable cause = error.getCause();
130      if (cause != null) {
131        return cause;
132      }
133    }
134    return error;
135  }
136
137  // This method is used to record the stack trace that calling the FutureUtils.get method. As in
138  // async client, the retry will be done in the retry timer thread, so the exception we get from
139  // the CompletableFuture will have a stack trace starting from the root of the retry timer. If we
140  // just throw this exception out when calling future.get(by unwrapping the ExecutionException),
141  // the upper layer even can not know where is the exception thrown...
142  // See HBASE-22316.
143  private static void setStackTrace(Throwable error) {
144    StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace();
145    StackTraceElement[] originalStackTrace = error.getStackTrace();
146    StackTraceElement[] newStackTrace =
147      new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1];
148    System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length);
149    newStackTrace[localStackTrace.length] =
150      new StackTraceElement("--------Future", "get--------", null, -1);
151    System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1,
152      originalStackTrace.length);
153    error.setStackTrace(newStackTrace);
154  }
155
156  /**
157   * If we could propagate the given {@code error} directly, we will fill the stack trace with the
158   * current thread's stack trace so it is easier to trace where is the exception thrown. If not, we
159   * will just create a new IOException and then throw it.
160   */
161  public static IOException rethrow(Throwable error) throws IOException {
162    if (error instanceof IOException) {
163      setStackTrace(error);
164      throw (IOException) error;
165    } else if (error instanceof RuntimeException) {
166      setStackTrace(error);
167      throw (RuntimeException) error;
168    } else if (error instanceof Error) {
169      setStackTrace(error);
170      throw (Error) error;
171    } else {
172      throw new IOException(error);
173    }
174  }
175
176  /**
177   * A helper class for getting the result of a Future, and convert the error to an
178   * {@link IOException}.
179   */
180  public static <T> T get(Future<T> future) throws IOException {
181    try {
182      return future.get();
183    } catch (InterruptedException e) {
184      throw (IOException) new InterruptedIOException().initCause(e);
185    } catch (ExecutionException e) {
186      throw rethrow(e.getCause());
187    }
188  }
189
190  /**
191   * A helper class for getting the result of a Future, and convert the error to an
192   * {@link IOException}.
193   */
194  public static <T> T get(Future<T> future, long timeout, TimeUnit unit) throws IOException {
195    try {
196      return future.get(timeout, unit);
197    } catch (InterruptedException e) {
198      throw (IOException) new InterruptedIOException().initCause(e);
199    } catch (ExecutionException e) {
200      throw rethrow(e.getCause());
201    } catch (TimeoutException e) {
202      throw new TimeoutIOException(e);
203    }
204  }
205
206  /**
207   * Returns a CompletableFuture that is already completed exceptionally with the given exception.
208   */
209  public static <T> CompletableFuture<T> failedFuture(Throwable e) {
210    CompletableFuture<T> future = new CompletableFuture<>();
211    future.completeExceptionally(e);
212    return future;
213  }
214
215  /**
216   * Returns a new CompletableFuture that is completed when all of the given CompletableFutures
217   * complete. If any of the given CompletableFutures complete exceptionally, then the returned
218   * CompletableFuture also does so, with a CompletionException holding this exception as its cause.
219   * Otherwise, the results of all given CompletableFutures could be obtained by the new returned
220   * CompletableFuture.
221   */
222  public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
223    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
224      .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
225  }
226}