001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static java.util.stream.Collectors.toList; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.List; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.CompletionException; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Executor; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032import java.util.function.BiConsumer; 033import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Helper class for processing futures. 040 */ 041@InterfaceAudience.Private 042public final class FutureUtils { 043 044 private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class); 045 046 private FutureUtils() { 047 } 048 049 /** 050 * This is method is used when you just want to add a listener to the given future. We will call 051 * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the 052 * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may 053 * suppress exceptions thrown from the code that completes the future, and this method will catch 054 * all the exception thrown from the {@code action} to catch possible code bugs. 055 * <p/> 056 * And the error phone check will always report FutureReturnValueIgnored because every method in 057 * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always 058 * have one future that has not been checked. So we introduce this method and add a suppress 059 * warnings annotation here. 060 */ 061 @SuppressWarnings("FutureReturnValueIgnored") 062 public static <T> void addListener(CompletableFuture<T> future, 063 BiConsumer<? super T, ? super Throwable> action) { 064 future.whenComplete((resp, error) -> { 065 try { 066 // See this post on stack overflow(shorten since the url is too long), 067 // https://s.apache.org/completionexception 068 // For a chain of CompleableFuture, only the first child CompletableFuture can get the 069 // original exception, others will get a CompletionException, which wraps the original 070 // exception. So here we unwrap it before passing it to the callback action. 071 action.accept(resp, unwrapCompletionException(error)); 072 } catch (Throwable t) { 073 LOG.error("Unexpected error caught when processing CompletableFuture", t); 074 } 075 }); 076 } 077 078 /** 079 * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only 080 * exception is that we will call 081 * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}. 082 * @see #addListener(CompletableFuture, BiConsumer) 083 */ 084 @SuppressWarnings("FutureReturnValueIgnored") 085 public static <T> void addListener(CompletableFuture<T> future, 086 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 087 future.whenCompleteAsync((resp, error) -> { 088 try { 089 action.accept(resp, unwrapCompletionException(error)); 090 } catch (Throwable t) { 091 LOG.error("Unexpected error caught when processing CompletableFuture", t); 092 } 093 }, executor); 094 } 095 096 /** 097 * Log the error if the future indicates any failure. 098 */ 099 public static void consume(CompletableFuture<?> future) { 100 addListener(future, (r, e) -> { 101 if (e != null) { 102 LOG.warn("Async operation fails", e); 103 } 104 }); 105 } 106 107 /** 108 * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all 109 * the callbacks in the given {@code executor}. 110 */ 111 public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future, 112 Executor executor) { 113 CompletableFuture<T> wrappedFuture = new CompletableFuture<>(); 114 addListener(future, (r, e) -> { 115 if (e != null) { 116 wrappedFuture.completeExceptionally(e); 117 } else { 118 wrappedFuture.complete(r); 119 } 120 }, executor); 121 return wrappedFuture; 122 } 123 124 /** 125 * Get the cause of the {@link Throwable} if it is a {@link CompletionException}. 126 */ 127 public static Throwable unwrapCompletionException(Throwable error) { 128 if (error instanceof CompletionException) { 129 Throwable cause = error.getCause(); 130 if (cause != null) { 131 return cause; 132 } 133 } 134 return error; 135 } 136 137 // This method is used to record the stack trace that calling the FutureUtils.get method. As in 138 // async client, the retry will be done in the retry timer thread, so the exception we get from 139 // the CompletableFuture will have a stack trace starting from the root of the retry timer. If we 140 // just throw this exception out when calling future.get(by unwrapping the ExecutionException), 141 // the upper layer even can not know where is the exception thrown... 142 // See HBASE-22316. 143 private static void setStackTrace(Throwable error) { 144 StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace(); 145 StackTraceElement[] originalStackTrace = error.getStackTrace(); 146 StackTraceElement[] newStackTrace = 147 new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1]; 148 System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length); 149 newStackTrace[localStackTrace.length] = 150 new StackTraceElement("--------Future", "get--------", null, -1); 151 System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1, 152 originalStackTrace.length); 153 error.setStackTrace(newStackTrace); 154 } 155 156 /** 157 * If we could propagate the given {@code error} directly, we will fill the stack trace with the 158 * current thread's stack trace so it is easier to trace where is the exception thrown. If not, we 159 * will just create a new IOException and then throw it. 160 */ 161 public static IOException rethrow(Throwable error) throws IOException { 162 if (error instanceof IOException) { 163 setStackTrace(error); 164 throw (IOException) error; 165 } else if (error instanceof RuntimeException) { 166 setStackTrace(error); 167 throw (RuntimeException) error; 168 } else if (error instanceof Error) { 169 setStackTrace(error); 170 throw (Error) error; 171 } else { 172 throw new IOException(error); 173 } 174 } 175 176 /** 177 * A helper class for getting the result of a Future, and convert the error to an 178 * {@link IOException}. 179 */ 180 public static <T> T get(Future<T> future) throws IOException { 181 try { 182 return future.get(); 183 } catch (InterruptedException e) { 184 throw (IOException) new InterruptedIOException().initCause(e); 185 } catch (ExecutionException e) { 186 throw rethrow(e.getCause()); 187 } 188 } 189 190 /** 191 * A helper class for getting the result of a Future, and convert the error to an 192 * {@link IOException}. 193 */ 194 public static <T> T get(Future<T> future, long timeout, TimeUnit unit) throws IOException { 195 try { 196 return future.get(timeout, unit); 197 } catch (InterruptedException e) { 198 throw (IOException) new InterruptedIOException().initCause(e); 199 } catch (ExecutionException e) { 200 throw rethrow(e.getCause()); 201 } catch (TimeoutException e) { 202 throw new TimeoutIOException(e); 203 } 204 } 205 206 /** 207 * Returns a CompletableFuture that is already completed exceptionally with the given exception. 208 */ 209 public static <T> CompletableFuture<T> failedFuture(Throwable e) { 210 CompletableFuture<T> future = new CompletableFuture<>(); 211 future.completeExceptionally(e); 212 return future; 213 } 214 215 /** 216 * Returns a new CompletableFuture that is completed when all of the given CompletableFutures 217 * complete. If any of the given CompletableFutures complete exceptionally, then the returned 218 * CompletableFuture also does so, with a CompletionException holding this exception as its cause. 219 * Otherwise, the results of all given CompletableFutures could be obtained by the new returned 220 * CompletableFuture. 221 */ 222 public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) { 223 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 224 .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); 225 } 226}