001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.IOException; 023import java.util.Map.Entry; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import org.apache.commons.lang3.mutable.MutableBoolean; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 031import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; 032import org.apache.hadoop.hbase.ipc.CallTimeoutException; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.ipc.RemoteException; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail 041 * feature. 042 * <p> 043 * The motivation is as follows : In case where a large number of clients try and talk to a 044 * particular region server in hbase, if the region server goes down due to network problems, we 045 * might end up in a scenario where the clients would go into a state where they all start to retry. 046 * This behavior will set off many of the threads in pretty much the same path and they all would be 047 * sleeping giving rise to a state where the client either needs to create more threads to send new 048 * requests to other hbase machines or block because the client cannot create anymore threads. 049 * <p> 050 * In most cases the clients might prefer to have a bound on the number of threads that are created 051 * in order to send requests to hbase. This would mostly result in the client thread starvation. 052 * <p> 053 * To circumvent this problem, the approach that is being taken here under is to let 1 of the many 054 * threads who are trying to contact the regionserver with connection problems and let the other 055 * threads get a {@link PreemptiveFastFailException} so that they can move on and take other 056 * requests. 057 * <p> 058 * This would give the client more flexibility on the kind of action he would want to take in cases 059 * where the regionserver is down. He can either discard the requests and send a nack upstream 060 * faster or have an application level retry or buffer the requests up so as to send them down to 061 * hbase later. 062 */ 063@InterfaceAudience.Private 064class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { 065 066 private static final Logger LOG = LoggerFactory.getLogger(PreemptiveFastFailInterceptor.class); 067 068 // amount of time to wait before we consider a server to be in fast fail 069 // mode 070 protected final long fastFailThresholdMilliSec; 071 072 // Keeps track of failures when we cannot talk to a server. Helps in 073 // fast failing clients if the server is down for a long time. 074 protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap = 075 new ConcurrentHashMap<>(); 076 077 // We populate repeatedFailuresMap every time there is a failure. So, to 078 // keep it from growing unbounded, we garbage collect the failure information 079 // every cleanupInterval. 080 protected final long failureMapCleanupIntervalMilliSec; 081 082 protected volatile long lastFailureMapCleanupTimeMilliSec; 083 084 // clear failure Info. Used to clean out all entries. 085 // A safety valve, in case the client does not exit the 086 // fast fail mode for any reason. 087 private long fastFailClearingTimeMilliSec; 088 089 private static final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = 090 new ThreadLocal<>(); 091 092 public PreemptiveFastFailInterceptor(Configuration conf) { 093 this.fastFailThresholdMilliSec = conf.getLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 094 HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT); 095 this.failureMapCleanupIntervalMilliSec = 096 conf.getLong(HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS, 097 HConstants.HBASE_CLIENT_FAILURE_MAP_CLEANUP_INTERVAL_MS_DEFAULT); 098 this.fastFailClearingTimeMilliSec = 099 conf.getLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, 100 HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT); 101 lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime(); 102 } 103 104 public void intercept(FastFailInterceptorContext context) throws PreemptiveFastFailException { 105 context.setFailureInfo(repeatedFailuresMap.get(context.getServer())); 106 if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) { 107 // In Fast-fail mode, all but one thread will fast fail. Check 108 // if we are that one chosen thread. 109 context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context.getFailureInfo())); 110 if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry 111 LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : " + context.getTries()); 112 throw new PreemptiveFastFailException(context.getFailureInfo().numConsecutiveFailures.get(), 113 context.getFailureInfo().timeOfFirstFailureMilliSec, 114 context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(), 115 context.getGuaranteedClientSideOnly().isTrue()); 116 } 117 } 118 context.setDidTry(true); 119 } 120 121 public void handleFailure(FastFailInterceptorContext context, Throwable t) throws IOException { 122 handleThrowable(t, context.getServer(), context.getCouldNotCommunicateWithServer(), 123 context.getGuaranteedClientSideOnly()); 124 } 125 126 public void updateFailureInfo(FastFailInterceptorContext context) { 127 updateFailureInfoForServer(context.getServer(), context.getFailureInfo(), context.didTry(), 128 context.getCouldNotCommunicateWithServer().booleanValue(), 129 context.isRetryDespiteFastFailMode()); 130 } 131 132 /** 133 * Handles failures encountered when communicating with a server. Updates the FailureInfo in 134 * repeatedFailuresMap to reflect the failure. Throws RepeatedConnectException if the client is in 135 * Fast fail mode. - the throwable to be handled. 136 */ 137 protected void handleFailureToServer(ServerName serverName, Throwable t) { 138 if (serverName == null || t == null) { 139 return; 140 } 141 long currentTime = EnvironmentEdgeManager.currentTime(); 142 FailureInfo fInfo = 143 computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime)); 144 fInfo.timeOfLatestAttemptMilliSec = currentTime; 145 fInfo.numConsecutiveFailures.incrementAndGet(); 146 } 147 148 public void handleThrowable(Throwable t1, ServerName serverName, 149 MutableBoolean couldNotCommunicateWithServer, MutableBoolean guaranteedClientSideOnly) 150 throws IOException { 151 Throwable t2 = ClientExceptionsUtil.translatePFFE(t1); 152 boolean isLocalException = !(t2 instanceof RemoteException); 153 154 if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2))) { 155 couldNotCommunicateWithServer.setValue(true); 156 guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException)); 157 handleFailureToServer(serverName, t2); 158 } 159 } 160 161 /** 162 * Occasionally cleans up unused information in repeatedFailuresMap. repeatedFailuresMap stores 163 * the failure information for all remote hosts that had failures. In order to avoid these from 164 * growing indefinitely, occassionallyCleanupFailureInformation() will clear these up once every 165 * cleanupInterval ms. 166 */ 167 protected void occasionallyCleanupFailureInformation() { 168 long now = EnvironmentEdgeManager.currentTime(); 169 if (!(now > lastFailureMapCleanupTimeMilliSec + failureMapCleanupIntervalMilliSec)) return; 170 171 // remove entries that haven't been attempted in a while 172 // No synchronization needed. It is okay if multiple threads try to 173 // remove the entry again and again from a concurrent hash map. 174 StringBuilder sb = new StringBuilder(); 175 for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) { 176 if (now > entry.getValue().timeOfLatestAttemptMilliSec + failureMapCleanupIntervalMilliSec) { // no 177 // recent 178 // failures 179 repeatedFailuresMap.remove(entry.getKey()); 180 } else 181 if (now > entry.getValue().timeOfFirstFailureMilliSec + this.fastFailClearingTimeMilliSec) { // been 182 // failing 183 // for 184 // a 185 // long 186 // time 187 LOG.error(entry.getKey() + " been failing for a long time. clearing out." 188 + entry.getValue().toString()); 189 repeatedFailuresMap.remove(entry.getKey()); 190 } else { 191 sb.append(entry.getKey().toString()).append(" failing ") 192 .append(entry.getValue().toString()).append("\n"); 193 } 194 } 195 if (sb.length() > 0) { 196 LOG.warn("Preemptive failure enabled for : " + sb.toString()); 197 } 198 lastFailureMapCleanupTimeMilliSec = now; 199 } 200 201 /** 202 * Checks to see if we are in the Fast fail mode for requests to the server. If a client is unable 203 * to contact a server for more than fastFailThresholdMilliSec the client will get into fast fail 204 * mode. 205 * @return true if the client is in fast fail mode for the server. 206 */ 207 private boolean inFastFailMode(ServerName server) { 208 FailureInfo fInfo = repeatedFailuresMap.get(server); 209 // if fInfo is null --> The server is considered good. 210 // If the server is bad, wait long enough to believe that the server is 211 // down. 212 return (fInfo != null && EnvironmentEdgeManager.currentTime() 213 > (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec)); 214 } 215 216 /** 217 * Checks to see if the current thread is already in FastFail mode for *some* server. 218 * @return true, if the thread is already in FF mode. 219 */ 220 private boolean currentThreadInFastFailMode() { 221 return (threadRetryingInFastFailMode.get() != null 222 && (threadRetryingInFastFailMode.get().booleanValue() == true)); 223 } 224 225 /** 226 * Check to see if the client should try to connnect to the server, inspite of knowing that it is 227 * in the fast fail mode. The idea here is that we want just one client thread to be actively 228 * trying to reconnect, while all the other threads trying to reach the server will short circuit. 229 * @return true if the client should try to connect to the server. 230 */ 231 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { 232 // We believe that the server is down, But, we want to have just one 233 // client 234 // actively trying to connect. If we are the chosen one, we will retry 235 // and not throw an exception. 236 if (fInfo != null && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) { 237 MutableBoolean threadAlreadyInFF = threadRetryingInFastFailMode.get(); 238 if (threadAlreadyInFF == null) { 239 threadAlreadyInFF = new MutableBoolean(); 240 threadRetryingInFastFailMode.set(threadAlreadyInFF); 241 } 242 threadAlreadyInFF.setValue(true); 243 return true; 244 } else { 245 return false; 246 } 247 } 248 249 /** 250 * This function updates the Failure info for a particular server after the attempt to 251 */ 252 private void updateFailureInfoForServer(ServerName server, FailureInfo fInfo, boolean didTry, 253 boolean couldNotCommunicate, boolean retryDespiteFastFailMode) { 254 if (server == null || fInfo == null || didTry == false) return; 255 256 // If we were able to connect to the server, reset the failure 257 // information. 258 if (couldNotCommunicate == false) { 259 LOG.info("Clearing out PFFE for server " + server); 260 repeatedFailuresMap.remove(server); 261 } else { 262 // update time of last attempt 263 long currentTime = EnvironmentEdgeManager.currentTime(); 264 fInfo.timeOfLatestAttemptMilliSec = currentTime; 265 266 // Release the lock if we were retrying inspite of FastFail 267 if (retryDespiteFastFailMode) { 268 fInfo.exclusivelyRetringInspiteOfFastFail.set(false); 269 threadRetryingInFastFailMode.get().setValue(false); 270 } 271 } 272 273 occasionallyCleanupFailureInformation(); 274 } 275 276 @Override 277 public void intercept(RetryingCallerInterceptorContext context) 278 throws PreemptiveFastFailException { 279 if (context instanceof FastFailInterceptorContext) { 280 intercept((FastFailInterceptorContext) context); 281 } 282 } 283 284 @Override 285 public void handleFailure(RetryingCallerInterceptorContext context, Throwable t) 286 throws IOException { 287 if (context instanceof FastFailInterceptorContext) { 288 handleFailure((FastFailInterceptorContext) context, t); 289 } 290 } 291 292 @Override 293 public void updateFailureInfo(RetryingCallerInterceptorContext context) { 294 if (context instanceof FastFailInterceptorContext) { 295 updateFailureInfo((FastFailInterceptorContext) context); 296 } 297 } 298 299 @Override 300 public RetryingCallerInterceptorContext createEmptyContext() { 301 return new FastFailInterceptorContext(); 302 } 303 304 protected boolean isServerInFailureMap(ServerName serverName) { 305 return this.repeatedFailuresMap.containsKey(serverName); 306 } 307 308 @Override 309 public String toString() { 310 return "PreemptiveFastFailInterceptor"; 311 } 312}