001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.Collections; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.CancellationException; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Future; 031import java.util.concurrent.TimeUnit; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseIOException; 035import org.apache.hadoop.hbase.HRegionLocation; 036import org.apache.hadoop.hbase.RegionLocations; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.ipc.HBaseRpcController; 039import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 047import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 049 050/** 051 * Caller that goes to replica if the primary region does no answer within a configurable timeout. 052 * If the timeout is reached, it calls all the secondary replicas, and returns the first answer. If 053 * the answer comes from one of the secondary replica, it will be marked as stale. 054 */ 055@InterfaceAudience.Private 056public class RpcRetryingCallerWithReadReplicas { 057 private static final Logger LOG = 058 LoggerFactory.getLogger(RpcRetryingCallerWithReadReplicas.class); 059 060 protected final ExecutorService pool; 061 protected final ClusterConnection cConnection; 062 protected final Configuration conf; 063 protected final Get get; 064 protected final TableName tableName; 065 protected final int timeBeforeReplicas; 066 private final int operationTimeout; 067 private final int rpcTimeout; 068 private final int retries; 069 private final RpcControllerFactory rpcControllerFactory; 070 private final RpcRetryingCallerFactory rpcRetryingCallerFactory; 071 private final Map<String, byte[]> requestAttributes; 072 073 public RpcRetryingCallerWithReadReplicas(RpcControllerFactory rpcControllerFactory, 074 TableName tableName, ClusterConnection cConnection, final Get get, ExecutorService pool, 075 int retries, int operationTimeout, int rpcTimeout, int timeBeforeReplicas, 076 Map<String, byte[]> requestAttributes) { 077 this.rpcControllerFactory = rpcControllerFactory; 078 this.tableName = tableName; 079 this.cConnection = cConnection; 080 this.conf = cConnection.getConfiguration(); 081 this.get = get; 082 this.pool = pool; 083 this.retries = retries; 084 this.operationTimeout = operationTimeout; 085 this.rpcTimeout = rpcTimeout; 086 this.timeBeforeReplicas = timeBeforeReplicas; 087 this.rpcRetryingCallerFactory = 088 new RpcRetryingCallerFactory(conf, cConnection.getConnectionConfiguration()); 089 this.requestAttributes = requestAttributes; 090 } 091 092 /** 093 * A RegionServerCallable that takes into account the replicas, i.e. - the call can be on any 094 * replica - we need to stop retrying when the call is completed - we can be interrupted 095 */ 096 class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> { 097 final int id; 098 099 public ReplicaRegionServerCallable(int id, HRegionLocation location) { 100 super(RpcRetryingCallerWithReadReplicas.this.cConnection, 101 RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), 102 rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET, 103 RpcRetryingCallerWithReadReplicas.this.requestAttributes); 104 this.id = id; 105 this.location = location; 106 } 107 108 /** 109 * Two responsibilities - if the call is already completed (by another replica) stops the 110 * retries. - set the location to the right region, depending on the replica. 111 */ 112 @Override 113 // TODO: Very like the super class implemenation. Can we shrink this down? 114 public void prepare(final boolean reload) throws IOException { 115 if (getRpcController().isCanceled()) return; 116 if (Thread.interrupted()) { 117 throw new InterruptedIOException(); 118 } 119 if (reload || location == null) { 120 RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow()); 121 location = id < rl.size() ? rl.getRegionLocation(id) : null; 122 } 123 124 if (location == null || location.getServerName() == null) { 125 // With this exception, there will be a retry. The location can be null for a replica 126 // when the table is created or after a split. 127 throw new HBaseIOException("There is no location for replica id #" + id); 128 } 129 130 setStubByServiceName(this.location.getServerName()); 131 } 132 133 @Override 134 // TODO: Very like the super class implemenation. Can we shrink this down? 135 protected Result rpcCall() throws Exception { 136 if (getRpcController().isCanceled()) return null; 137 if (Thread.interrupted()) { 138 throw new InterruptedIOException(); 139 } 140 byte[] reg = location.getRegionInfo().getRegionName(); 141 ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); 142 HBaseRpcController hrc = (HBaseRpcController) getRpcController(); 143 hrc.reset(); 144 hrc.setCallTimeout(rpcTimeout); 145 hrc.setPriority(tableName); 146 ClientProtos.GetResponse response = getStub().get(hrc, request); 147 if (response == null) { 148 return null; 149 } 150 return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner()); 151 } 152 } 153 154 /** 155 * <p> 156 * Algo: - we put the query into the execution pool. - after x ms, if we don't have a result, we 157 * add the queries for the secondary replicas - we take the first answer - when done, we cancel 158 * what's left. Cancelling means: - removing from the pool if the actual call was not started - 159 * interrupting the call if it has started Client side, we need to take into account - a call is 160 * not executed immediately after being put into the pool - a call is a thread. Let's not multiply 161 * the number of thread by the number of replicas. Server side, if we can cancel when it's still 162 * in the handler pool, it's much better, as a call can take some i/o. 163 * </p> 164 * Globally, the number of retries, timeout and so on still applies, but it's per replica, not 165 * global. We continue until all retries are done, or all timeouts are exceeded. 166 */ 167 public Result call(int operationTimeout) 168 throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { 169 boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); 170 171 RegionLocations rl = null; 172 boolean skipPrimary = false; 173 try { 174 rl = getRegionLocations(true, 175 (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), 176 cConnection, tableName, get.getRow()); 177 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 178 // When there is no specific replica id specified. It just needs to load all replicas. 179 if (isTargetReplicaSpecified) { 180 throw e; 181 } else { 182 // We cannot get the primary replica location, it is possible that the region 183 // server hosting meta is down, it needs to proceed to try cached replicas. 184 if (cConnection instanceof ConnectionImplementation) { 185 rl = ((ConnectionImplementation) cConnection).getCachedLocation(tableName, get.getRow()); 186 if (rl == null) { 187 // No cached locations 188 throw e; 189 } 190 191 // Primary replica location is not known, skip primary replica 192 skipPrimary = true; 193 } else { 194 // For completeness 195 throw e; 196 } 197 } 198 } 199 200 final ResultBoundedCompletionService<Result> cs = 201 new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size()); 202 int startIndex = 0; 203 int endIndex = rl.size(); 204 205 if (isTargetReplicaSpecified) { 206 addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); 207 endIndex = 1; 208 } else { 209 if (!skipPrimary) { 210 addCallsForReplica(cs, rl, 0, 0); 211 try { 212 // wait for the timeout to see whether the primary responds back 213 Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, 214 // microseconds 215 if (f != null) { 216 return f.get(); // great we got a response 217 } 218 if (cConnection.getConnectionMetrics() != null) { 219 cConnection.getConnectionMetrics().incrHedgedReadOps(); 220 } 221 } catch (ExecutionException e) { 222 // We ignore the ExecutionException and continue with the secondary replicas 223 if (LOG.isDebugEnabled()) { 224 LOG.debug("Primary replica returns " + e.getCause()); 225 } 226 227 // Skip the result from the primary as we know that there is something wrong 228 startIndex = 1; 229 } catch (CancellationException e) { 230 throw new InterruptedIOException(); 231 } catch (InterruptedException e) { 232 throw new InterruptedIOException(); 233 } 234 } else { 235 // Since primary replica is skipped, the endIndex needs to be adjusted accordingly 236 endIndex--; 237 } 238 239 // submit call for the all of the secondaries at once 240 addCallsForReplica(cs, rl, 1, rl.size() - 1); 241 } 242 try { 243 ResultBoundedCompletionService<Result>.QueueingFuture<Result> f = 244 cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, 245 startIndex, endIndex); 246 if (f == null) { 247 throw new RetriesExhaustedException( 248 "Timed out after " + operationTimeout + "ms. Get is sent to replicas with startIndex: " 249 + startIndex + ", endIndex: " + endIndex + ", Locations: " + rl); 250 } 251 if ( 252 cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified && !skipPrimary 253 && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID 254 ) { 255 cConnection.getConnectionMetrics().incrHedgedReadWin(); 256 } 257 return f.get(); 258 } catch (ExecutionException e) { 259 throwEnrichedException(e, retries); 260 } catch (CancellationException e) { 261 throw new InterruptedIOException(); 262 } catch (InterruptedException e) { 263 throw new InterruptedIOException(); 264 } finally { 265 // We get there because we were interrupted or because one or more of the 266 // calls succeeded or failed. In all case, we stop all our tasks. 267 cs.cancelAll(); 268 } 269 270 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 271 return null; // unreachable 272 } 273 274 /** 275 * Extract the real exception from the ExecutionException, and throws what makes more sense. 276 */ 277 static void throwEnrichedException(ExecutionException e, int retries) 278 throws RetriesExhaustedException, DoNotRetryIOException { 279 Throwable t = e.getCause(); 280 assert t != null; // That's what ExecutionException is about: holding an exception 281 t.printStackTrace(); 282 283 if (t instanceof RetriesExhaustedException) { 284 throw (RetriesExhaustedException) t; 285 } 286 287 if (t instanceof DoNotRetryIOException) { 288 throw (DoNotRetryIOException) t; 289 } 290 291 RetriesExhaustedException.ThrowableWithExtraContext qt = 292 new RetriesExhaustedException.ThrowableWithExtraContext(t, 293 EnvironmentEdgeManager.currentTime(), null); 294 295 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = 296 Collections.singletonList(qt); 297 298 throw new RetriesExhaustedException(retries, exceptions); 299 } 300 301 /** 302 * Creates the calls and submit them 303 * @param cs - the completion service to use for submitting 304 * @param rl - the region locations 305 * @param min - the id of the first replica, inclusive 306 * @param max - the id of the last replica, inclusive. 307 */ 308 private void addCallsForReplica(ResultBoundedCompletionService<Result> cs, RegionLocations rl, 309 int min, int max) { 310 for (int id = min; id <= max; id++) { 311 HRegionLocation hrl = rl.getRegionLocation(id); 312 ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); 313 cs.submit(callOnReplica, rpcTimeout, operationTimeout, id); 314 } 315 } 316 317 static RegionLocations getRegionLocations(boolean useCache, int replicaId, 318 ClusterConnection cConnection, TableName tableName, byte[] row) 319 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { 320 321 RegionLocations rl; 322 try { 323 if (useCache) { 324 rl = cConnection.locateRegion(tableName, row, true, true, replicaId); 325 } else { 326 rl = cConnection.relocateRegion(tableName, row, replicaId); 327 } 328 } catch (DoNotRetryIOException | InterruptedIOException | RetriesExhaustedException e) { 329 throw e; 330 } catch (IOException e) { 331 throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId 332 + " of region for " + Bytes.toStringBinary(row) + " in " + tableName, e); 333 } 334 if (rl == null) { 335 throw new RetriesExhaustedException("Cannot get the location for replica" + replicaId 336 + " of region for " + Bytes.toStringBinary(row) + " in " + tableName); 337 } 338 339 return rl; 340 } 341}