001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.HashSet; 023import java.util.Set; 024import java.util.concurrent.Callable; 025import java.util.concurrent.CancellationException; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Future; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.RegionLocations; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * This class has the logic for handling scanners for regions with and without replicas. 1. A scan 044 * is attempted on the default (primary) region, or a specific region. 2. The scanner sends all the 045 * RPCs to the default/specific region until it is done, or, there is a timeout on the 046 * default/specific region (a timeout of zero is disallowed). 3. If there is a timeout in (2) above, 047 * scanner(s) is opened on the non-default replica(s) only for Consistency.TIMELINE without specific 048 * replica id specified. 4. The results from the first successful scanner are taken, and it is 049 * stored which server returned the results. 5. The next RPCs are done on the above stored server 050 * until it is done or there is a timeout, in which case, the other replicas are queried (as in (3) 051 * above). 052 */ 053@InterfaceAudience.Private 054class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { 055 private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class); 056 volatile ScannerCallable currentScannerCallable; 057 AtomicBoolean replicaSwitched = new AtomicBoolean(false); 058 private final ClusterConnection cConnection; 059 protected final ExecutorService pool; 060 private final boolean useScannerTimeoutForNextCalls; 061 protected final int timeBeforeReplicas; 062 private final Scan scan; 063 private final int retries; 064 private Result lastResult; 065 private final RpcRetryingCaller<Result[]> caller; 066 private final TableName tableName; 067 private Configuration conf; 068 private final int scannerTimeout; 069 private final int readRpcTimeout; 070 private Set<ScannerCallable> outstandingCallables = new HashSet<>(); 071 private boolean someRPCcancelled = false; // required for testing purposes only 072 private int regionReplication = 0; 073 074 public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, 075 ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, 076 int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls, 077 int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) { 078 this.currentScannerCallable = baseCallable; 079 this.cConnection = cConnection; 080 this.pool = pool; 081 this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls; 082 if (timeBeforeReplicas < 0) { 083 throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); 084 } 085 this.timeBeforeReplicas = timeBeforeReplicas; 086 this.scan = scan; 087 this.retries = retries; 088 this.tableName = tableName; 089 this.conf = conf; 090 this.readRpcTimeout = readRpcTimeout; 091 this.scannerTimeout = scannerTimeout; 092 this.caller = caller; 093 } 094 095 public void setClose() { 096 if (currentScannerCallable != null) { 097 currentScannerCallable.setClose(); 098 } else { 099 LOG.warn("Calling close on ScannerCallable reference that is already null, " 100 + "which shouldn't happen."); 101 } 102 } 103 104 public void setRenew(boolean val) { 105 currentScannerCallable.setRenew(val); 106 } 107 108 public void setCaching(int caching) { 109 currentScannerCallable.setCaching(caching); 110 } 111 112 public int getCaching() { 113 return currentScannerCallable.getCaching(); 114 } 115 116 public HRegionInfo getHRegionInfo() { 117 return currentScannerCallable.getHRegionInfo(); 118 } 119 120 public MoreResults moreResultsInRegion() { 121 return currentScannerCallable.moreResultsInRegion(); 122 } 123 124 public MoreResults moreResultsForScan() { 125 return currentScannerCallable.moreResultsForScan(); 126 } 127 128 @Override 129 public Result[] call(int timeout) throws IOException { 130 // If the active replica callable was closed somewhere, invoke the RPC to 131 // really close it. In the case of regular scanners, this applies. We make couple 132 // of RPCs to a RegionServer, and when that region is exhausted, we set 133 // the closed flag. Then an RPC is required to actually close the scanner. 134 if (currentScannerCallable != null && currentScannerCallable.closed) { 135 // For closing we target that exact scanner (and not do replica fallback like in 136 // the case of normal reads) 137 if (LOG.isTraceEnabled()) { 138 LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId); 139 } 140 Result[] r = currentScannerCallable.call(timeout); 141 currentScannerCallable = null; 142 return r; 143 } else if (currentScannerCallable == null) { 144 LOG.warn("Another call received, but our ScannerCallable is already null. " 145 + "This shouldn't happen, but there's not much to do, so logging and returning null."); 146 return null; 147 } 148 // We need to do the following: 149 // 1. When a scan goes out to a certain replica (default or not), we need to 150 // continue to hit that until there is a failure. So store the last successfully invoked 151 // replica 152 // 2. We should close the "losing" scanners (scanners other than the ones we hear back 153 // from first) 154 // 155 // Since RegionReplication is a table attribute, it wont change as long as table is enabled, 156 // it just needs to be set once. 157 158 if (regionReplication <= 0) { 159 RegionLocations rl = null; 160 try { 161 rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, 162 RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, 163 currentScannerCallable.getRow()); 164 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 165 // We cannot get the primary replica region location, it is possible that the region server 166 // hosting meta table is down, it needs to proceed to try cached replicas directly. 167 if (cConnection instanceof ConnectionImplementation) { 168 rl = ((ConnectionImplementation) cConnection).getCachedLocation(tableName, 169 currentScannerCallable.getRow()); 170 if (rl == null) { 171 throw e; 172 } 173 } else { 174 // For completeness 175 throw e; 176 } 177 } 178 regionReplication = rl.size(); 179 } 180 // allocate a bounded-completion pool of some multiple of number of replicas. 181 // We want to accommodate some RPCs for redundant replica scans (but are still in progress) 182 final ConnectionConfiguration connectionConfig = cConnection != null 183 ? cConnection.getConnectionConfiguration() 184 : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf); 185 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = 186 new ResultBoundedCompletionService<>( 187 RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf, 188 connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()), 189 pool, regionReplication * 5); 190 191 AtomicBoolean done = new AtomicBoolean(false); 192 // make sure we use the same rpcTimeout for current and other replicas 193 int rpcTimeoutForCall = getRpcTimeout(); 194 195 replicaSwitched.set(false); 196 // submit call for the primary replica or user specified replica 197 addCallsForCurrentReplica(cs, rpcTimeoutForCall); 198 int startIndex = 0; 199 200 try { 201 // wait for the timeout to see whether the primary responds back 202 Future<Pair<Result[], ScannerCallable>> f = 203 cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds 204 if (f != null) { 205 // After poll, if f is not null, there must be a completed task 206 Pair<Result[], ScannerCallable> r = f.get(); 207 if (r != null && r.getSecond() != null) { 208 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 209 } 210 return r == null ? null : r.getFirst(); // great we got a response 211 } 212 } catch (ExecutionException e) { 213 // We ignore the ExecutionException and continue with the replicas 214 if (LOG.isDebugEnabled()) { 215 LOG.debug("Scan with primary region returns " + e.getCause()); 216 } 217 218 // If rl's size is 1 or scan's consitency is strong, or scan is over specific replica, 219 // it needs to throw out the exception from the primary replica 220 if ( 221 regionReplication == 1 || scan.getConsistency() == Consistency.STRONG 222 || scan.getReplicaId() >= 0 223 ) { 224 // Rethrow the first exception 225 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 226 } 227 startIndex = 1; 228 } catch (CancellationException e) { 229 throw new InterruptedIOException(e.getMessage()); 230 } catch (InterruptedException e) { 231 throw new InterruptedIOException(e.getMessage()); 232 } 233 234 // submit call for the all of the secondaries at once 235 int endIndex = regionReplication; 236 if (scan.getConsistency() == Consistency.STRONG || scan.getReplicaId() >= 0) { 237 // When scan's consistency is strong or scan is over specific replica region, do not send to 238 // the secondaries 239 endIndex = 1; 240 } else { 241 // TODO: this may be an overkill for large region replication 242 addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall); 243 } 244 245 try { 246 Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout, 247 TimeUnit.MILLISECONDS, startIndex, endIndex); 248 249 if (f == null) { 250 throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms"); 251 } 252 Pair<Result[], ScannerCallable> r = f.get(); 253 254 if (r != null && r.getSecond() != null) { 255 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 256 } 257 return r == null ? null : r.getFirst(); // great we got an answer 258 259 } catch (ExecutionException e) { 260 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 261 } catch (CancellationException e) { 262 throw new InterruptedIOException(e.getMessage()); 263 } catch (InterruptedException e) { 264 throw new InterruptedIOException(e.getMessage()); 265 } finally { 266 // We get there because we were interrupted or because one or more of the 267 // calls succeeded or failed. In all case, we stop all our tasks. 268 cs.cancelAll(); 269 } 270 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 271 throw new IOException("Imposible? Arrive at an unreachable line..."); 272 } 273 274 @SuppressWarnings("FutureReturnValueIgnored") 275 private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, 276 AtomicBoolean done, ExecutorService pool) { 277 if (done.compareAndSet(false, true)) { 278 if (currentScannerCallable != scanner) replicaSwitched.set(true); 279 currentScannerCallable = scanner; 280 // store where to start the replica scanner from if we need to. 281 if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; 282 if (LOG.isTraceEnabled()) { 283 LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId 284 + " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId()); 285 } 286 // close all outstanding replica scanners but the one we heard back from 287 outstandingCallables.remove(scanner); 288 for (ScannerCallable s : outstandingCallables) { 289 if (LOG.isTraceEnabled()) { 290 LOG.trace("Closing scanner id=" + s.scannerId + ", replica=" 291 + s.getHRegionInfo().getRegionId() + " because slow and replica=" 292 + this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded"); 293 } 294 // Submit the "close" to the pool since this might take time, and we don't 295 // want to wait for the "close" to happen yet. The "wait" will happen when 296 // the table is closed (when the awaitTermination of the underlying pool is called) 297 s.setClose(); 298 final RetryingRPC r = new RetryingRPC(s); 299 pool.submit(new Callable<Void>() { 300 @Override 301 public Void call() throws Exception { 302 r.call(scannerTimeout); 303 return null; 304 } 305 }); 306 } 307 // now clear outstandingCallables since we scheduled a close for all the contained scanners 308 outstandingCallables.clear(); 309 } 310 } 311 312 /** 313 * When a scanner switches in the middle of scanning (the 'next' call fails for example), the 314 * upper layer {@link ClientScanner} needs to know 315 */ 316 public boolean switchedToADifferentReplica() { 317 return replicaSwitched.get(); 318 } 319 320 /** 321 * Returns true when the most recent RPC response indicated that the response was a heartbeat 322 * message. Heartbeat messages are sent back from the server when the processing of the scan 323 * request exceeds a certain time threshold. Heartbeats allow the server to avoid timeouts during 324 * long running scan operations. 325 */ 326 public boolean isHeartbeatMessage() { 327 return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); 328 } 329 330 public Cursor getCursor() { 331 return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; 332 } 333 334 private void addCallsForCurrentReplica( 335 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int rpcTimeout) { 336 RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); 337 outstandingCallables.add(currentScannerCallable); 338 cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id); 339 } 340 341 /** 342 * As we have a call sequence for scan, it is useless to have a different rpc timeout which is 343 * less than the scan timeout. If the server does not respond in time(usually this will not happen 344 * as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the 345 * next request and the only way to fix this is to close the scanner and open a new one. 346 * <p> 347 * The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If 348 * using legacy behavior, we always use that. 349 * <p> 350 * If new behavior is enabled, we determine the rpc timeout to use based on whether the scanner is 351 * open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout. 352 */ 353 private int getRpcTimeout() { 354 if (useScannerTimeoutForNextCalls) { 355 return isNextCall() ? scannerTimeout : readRpcTimeout; 356 } else { 357 return readRpcTimeout; 358 } 359 } 360 361 private boolean isNextCall() { 362 return currentScannerCallable != null && currentScannerCallable.scannerId != -1 363 && !currentScannerCallable.renew && !currentScannerCallable.closed; 364 } 365 366 private void addCallsForOtherReplicas( 367 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max, 368 int rpcTimeout) { 369 370 for (int id = min; id <= max; id++) { 371 if (currentScannerCallable.id == id) { 372 continue; // this was already scheduled earlier 373 } 374 ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); 375 setStartRowForReplicaCallable(s); 376 outstandingCallables.add(s); 377 RetryingRPC retryingOnReplica = new RetryingRPC(s); 378 cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id); 379 } 380 } 381 382 /** 383 * Set the start row for the replica callable based on the state of the last result received. 384 * @param callable The callable to set the start row on 385 */ 386 private void setStartRowForReplicaCallable(ScannerCallable callable) { 387 if (this.lastResult == null || callable == null) { 388 return; 389 } 390 // 1. The last result was a partial result which means we have not received all of the cells 391 // for this row. Thus, use the last result's row as the start row. If a replica switch 392 // occurs, the scanner will ensure that any accumulated partial results are cleared, 393 // and the scan can resume from this row. 394 // 2. The last result was not a partial result which means it contained all of the cells for 395 // that row (we no longer need any information from it). Set the start row to the next 396 // closest row that could be seen. 397 callable.getScan().withStartRow(this.lastResult.getRow(), 398 this.lastResult.mayHaveMoreCellsInRow()); 399 } 400 401 boolean isAnyRPCcancelled() { 402 return someRPCcancelled; 403 } 404 405 class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable { 406 final ScannerCallable callable; 407 RpcRetryingCaller<Result[]> caller; 408 private volatile boolean cancelled = false; 409 410 RetryingRPC(ScannerCallable callable) { 411 this.callable = callable; 412 // For the Consistency.STRONG (default case), we reuse the caller 413 // to keep compatibility with what is done in the past 414 // For the Consistency.TIMELINE case, we can't reuse the caller 415 // since we could be making parallel RPCs (caller.callWithRetries is synchronized 416 // and we can't invoke it multiple times at the same time) 417 this.caller = ScannerCallableWithReplicas.this.caller; 418 if (scan.getConsistency() == Consistency.TIMELINE) { 419 final ConnectionConfiguration connectionConfig = cConnection != null 420 ? cConnection.getConnectionConfiguration() 421 : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf); 422 this.caller = 423 RpcRetryingCallerFactory 424 .instantiate(ScannerCallableWithReplicas.this.conf, connectionConfig, 425 cConnection == null ? null : cConnection.getConnectionMetrics()) 426 .<Result[]> newCaller(); 427 } 428 } 429 430 @Override 431 public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException { 432 // since the retries is done within the ResultBoundedCompletionService, 433 // we don't invoke callWithRetries here 434 if (cancelled) { 435 return null; 436 } 437 Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout); 438 return new Pair<>(res, this.callable); 439 } 440 441 @Override 442 public void prepare(boolean reload) throws IOException { 443 if (cancelled) return; 444 445 if (Thread.interrupted()) { 446 throw new InterruptedIOException(); 447 } 448 449 callable.prepare(reload); 450 } 451 452 @Override 453 public void throwable(Throwable t, boolean retrying) { 454 callable.throwable(t, retrying); 455 } 456 457 @Override 458 public String getExceptionMessageAdditionalDetail() { 459 return callable.getExceptionMessageAdditionalDetail(); 460 } 461 462 @Override 463 public long sleep(long pause, int tries) { 464 return callable.sleep(pause, tries); 465 } 466 467 @Override 468 public void cancel() { 469 cancelled = true; 470 caller.cancel(); 471 if (callable.getRpcController() != null) { 472 callable.getRpcController().startCancel(); 473 } 474 someRPCcancelled = true; 475 } 476 477 @Override 478 public boolean isCancelled() { 479 return cancelled; 480 } 481 } 482 483 @Override 484 public void prepare(boolean reload) throws IOException { 485 } 486 487 @Override 488 public void throwable(Throwable t, boolean retrying) { 489 currentScannerCallable.throwable(t, retrying); 490 } 491 492 @Override 493 public String getExceptionMessageAdditionalDetail() { 494 return currentScannerCallable.getExceptionMessageAdditionalDetail(); 495 } 496 497 @Override 498 public long sleep(long pause, int tries) { 499 return currentScannerCallable.sleep(pause, tries); 500 } 501}