002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashSet;
023import java.util.Set;
024import java.util.concurrent.Callable;
025import java.util.concurrent.CancellationException;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Future;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
043 * This class has the logic for handling scanners for regions with and without replicas. 1. A scan
044 * is attempted on the default (primary) region, or a specific region. 2. The scanner sends all the
045 * RPCs to the default/specific region until it is done, or, there is a timeout on the
046 * default/specific region (a timeout of zero is disallowed). 3. If there is a timeout in (2) above,
047 * scanner(s) is opened on the non-default replica(s) only for Consistency.TIMELINE without specific
048 * replica id specified. 4. The results from the first successful scanner are taken, and it is
049 * stored which server returned the results. 5. The next RPCs are done on the above stored server
050 * until it is done or there is a timeout, in which case, the other replicas are queried (as in (3)
051 * above).
052 */
054class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
055  private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
056  volatile ScannerCallable currentScannerCallable;
057  AtomicBoolean replicaSwitched = new AtomicBoolean(false);
058  private final ClusterConnection cConnection;
059  protected final ExecutorService pool;
060  private final boolean useScannerTimeoutForNextCalls;
061  protected final int timeBeforeReplicas;
062  private final Scan scan;
063  private final int retries;
064  private Result lastResult;
065  private final RpcRetryingCaller<Result[]> caller;
066  private final TableName tableName;
067  private Configuration conf;
068  private final int scannerTimeout;
069  private final int readRpcTimeout;
070  private Set<ScannerCallable> outstandingCallables = new HashSet<>();
071  private boolean someRPCcancelled = false; // required for testing purposes only
072  private int regionReplication = 0;
074  public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
075    ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
076    int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls,
077    int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
078    this.currentScannerCallable = baseCallable;
079    this.cConnection = cConnection;
080    this.pool = pool;
081    this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls;
082    if (timeBeforeReplicas < 0) {
083      throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
084    }
085    this.timeBeforeReplicas = timeBeforeReplicas;
086    this.scan = scan;
087    this.retries = retries;
088    this.tableName = tableName;
089    this.conf = conf;
090    this.readRpcTimeout = readRpcTimeout;
091    this.scannerTimeout = scannerTimeout;
092    this.caller = caller;
093  }
095  public void setClose() {
096    if (currentScannerCallable != null) {
097      currentScannerCallable.setClose();
098    } else {
099      LOG.warn("Calling close on ScannerCallable reference that is already null, "
100        + "which shouldn't happen.");
101    }
102  }
104  public void setRenew(boolean val) {
105    currentScannerCallable.setRenew(val);
106  }
108  public void setCaching(int caching) {
109    currentScannerCallable.setCaching(caching);
110  }
112  public int getCaching() {
113    return currentScannerCallable.getCaching();
114  }
116  public HRegionInfo getHRegionInfo() {
117    return currentScannerCallable.getHRegionInfo();
118  }
120  public MoreResults moreResultsInRegion() {
121    return currentScannerCallable.moreResultsInRegion();
122  }
124  public MoreResults moreResultsForScan() {
125    return currentScannerCallable.moreResultsForScan();
126  }
128  @Override
129  public Result[] call(int timeout) throws IOException {
130    // If the active replica callable was closed somewhere, invoke the RPC to
131    // really close it. In the case of regular scanners, this applies. We make couple
132    // of RPCs to a RegionServer, and when that region is exhausted, we set
133    // the closed flag. Then an RPC is required to actually close the scanner.
134    if (currentScannerCallable != null && currentScannerCallable.closed) {
135      // For closing we target that exact scanner (and not do replica fallback like in
136      // the case of normal reads)
137      if (LOG.isTraceEnabled()) {
138        LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
139      }
140      Result[] r = currentScannerCallable.call(timeout);
141      currentScannerCallable = null;
142      return r;
143    } else if (currentScannerCallable == null) {
144      LOG.warn("Another call received, but our ScannerCallable is already null. "
145        + "This shouldn't happen, but there's not much to do, so logging and returning null.");
146      return null;
147    }
148    // We need to do the following:
149    // 1. When a scan goes out to a certain replica (default or not), we need to
150    // continue to hit that until there is a failure. So store the last successfully invoked
151    // replica
152    // 2. We should close the "losing" scanners (scanners other than the ones we hear back
153    // from first)
154    //
155    // Since RegionReplication is a table attribute, it wont change as long as table is enabled,
156    // it just needs to be set once.
158    if (regionReplication <= 0) {
159      RegionLocations rl = null;
160      try {
161        rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
162          RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
163          currentScannerCallable.getRow());
164      } catch (RetriesExhaustedException | DoNotRetryIOException e) {
165        // We cannot get the primary replica region location, it is possible that the region server
166        // hosting meta table is down, it needs to proceed to try cached replicas directly.
167        if (cConnection instanceof ConnectionImplementation) {
168          rl = ((ConnectionImplementation) cConnection).getCachedLocation(tableName,
169            currentScannerCallable.getRow());
170          if (rl == null) {
171            throw e;
172          }
173        } else {
174          // For completeness
175          throw e;
176        }
177      }
178      regionReplication = rl.size();
179    }
180    // allocate a bounded-completion pool of some multiple of number of replicas.
181    // We want to accommodate some RPCs for redundant replica scans (but are still in progress)
182    final ConnectionConfiguration connectionConfig = cConnection != null
183      ? cConnection.getConnectionConfiguration()
184      : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
185    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
186      new ResultBoundedCompletionService<>(
187        RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf,
188          connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()),
189        pool, regionReplication * 5);
191    AtomicBoolean done = new AtomicBoolean(false);
192    // make sure we use the same rpcTimeout for current and other replicas
193    int rpcTimeoutForCall = getRpcTimeout();
195    replicaSwitched.set(false);
196    // submit call for the primary replica or user specified replica
197    addCallsForCurrentReplica(cs, rpcTimeoutForCall);
198    int startIndex = 0;
200    try {
201      // wait for the timeout to see whether the primary responds back
202      Future<Pair<Result[], ScannerCallable>> f =
203        cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
204      if (f != null) {
205        // After poll, if f is not null, there must be a completed task
206        Pair<Result[], ScannerCallable> r = f.get();
207        if (r != null && r.getSecond() != null) {
208          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
209        }
210        return r == null ? null : r.getFirst(); // great we got a response
211      }
212    } catch (ExecutionException e) {
213      // We ignore the ExecutionException and continue with the replicas
214      if (LOG.isDebugEnabled()) {
215        LOG.debug("Scan with primary region returns " + e.getCause());
216      }
218      // If rl's size is 1 or scan's consitency is strong, or scan is over specific replica,
219      // it needs to throw out the exception from the primary replica
220      if (
221        regionReplication == 1 || scan.getConsistency() == Consistency.STRONG
222          || scan.getReplicaId() >= 0
223      ) {
224        // Rethrow the first exception
225        RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
226      }
227      startIndex = 1;
228    } catch (CancellationException e) {
229      throw new InterruptedIOException(e.getMessage());
230    } catch (InterruptedException e) {
231      throw new InterruptedIOException(e.getMessage());
232    }
234    // submit call for the all of the secondaries at once
235    int endIndex = regionReplication;
236    if (scan.getConsistency() == Consistency.STRONG || scan.getReplicaId() >= 0) {
237      // When scan's consistency is strong or scan is over specific replica region, do not send to
238      // the secondaries
239      endIndex = 1;
240    } else {
241      // TODO: this may be an overkill for large region replication
242      addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall);
243    }
245    try {
246      Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
247        TimeUnit.MILLISECONDS, startIndex, endIndex);
249      if (f == null) {
250        throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
251      }
252      Pair<Result[], ScannerCallable> r = f.get();
254      if (r != null && r.getSecond() != null) {
255        updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
256      }
257      return r == null ? null : r.getFirst(); // great we got an answer
259    } catch (ExecutionException e) {
260      RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
261    } catch (CancellationException e) {
262      throw new InterruptedIOException(e.getMessage());
263    } catch (InterruptedException e) {
264      throw new InterruptedIOException(e.getMessage());
265    } finally {
266      // We get there because we were interrupted or because one or more of the
267      // calls succeeded or failed. In all case, we stop all our tasks.
268      cs.cancelAll();
269    }
270    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
271    throw new IOException("Imposible? Arrive at an unreachable line...");
272  }
274  @SuppressWarnings("FutureReturnValueIgnored")
275  private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
276    AtomicBoolean done, ExecutorService pool) {
277    if (done.compareAndSet(false, true)) {
278      if (currentScannerCallable != scanner) replicaSwitched.set(true);
279      currentScannerCallable = scanner;
280      // store where to start the replica scanner from if we need to.
281      if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
282      if (LOG.isTraceEnabled()) {
283        LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId
284          + " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
285      }
286      // close all outstanding replica scanners but the one we heard back from
287      outstandingCallables.remove(scanner);
288      for (ScannerCallable s : outstandingCallables) {
289        if (LOG.isTraceEnabled()) {
290          LOG.trace("Closing scanner id=" + s.scannerId + ", replica="
291            + s.getHRegionInfo().getRegionId() + " because slow and replica="
292            + this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
293        }
294        // Submit the "close" to the pool since this might take time, and we don't
295        // want to wait for the "close" to happen yet. The "wait" will happen when
296        // the table is closed (when the awaitTermination of the underlying pool is called)
297        s.setClose();
298        final RetryingRPC r = new RetryingRPC(s);
299        pool.submit(new Callable<Void>() {
300          @Override
301          public Void call() throws Exception {
302            r.call(scannerTimeout);
303            return null;
304          }
305        });
306      }
307      // now clear outstandingCallables since we scheduled a close for all the contained scanners
308      outstandingCallables.clear();
309    }
310  }
312  /**
313   * When a scanner switches in the middle of scanning (the 'next' call fails for example), the
314   * upper layer {@link ClientScanner} needs to know
315   */
316  public boolean switchedToADifferentReplica() {
317    return replicaSwitched.get();
318  }
320  /**
321   * Returns true when the most recent RPC response indicated that the response was a heartbeat
322   * message. Heartbeat messages are sent back from the server when the processing of the scan
323   * request exceeds a certain time threshold. Heartbeats allow the server to avoid timeouts during
324   * long running scan operations.
325   */
326  public boolean isHeartbeatMessage() {
327    return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
328  }
330  public Cursor getCursor() {
331    return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
332  }
334  private void addCallsForCurrentReplica(
335    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int rpcTimeout) {
336    RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
337    outstandingCallables.add(currentScannerCallable);
338    cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id);
339  }
341  /**
342   * As we have a call sequence for scan, it is useless to have a different rpc timeout which is
343   * less than the scan timeout. If the server does not respond in time(usually this will not happen
344   * as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the
345   * next request and the only way to fix this is to close the scanner and open a new one.
346   * <p>
347   * The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If
348   * using legacy behavior, we always use that.
349   * <p>
350   * If new behavior is enabled, we determine the rpc timeout to use based on whether the scanner is
351   * open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout.
352   */
353  private int getRpcTimeout() {
354    if (useScannerTimeoutForNextCalls) {
355      return isNextCall() ? scannerTimeout : readRpcTimeout;
356    } else {
357      return readRpcTimeout;
358    }
359  }
361  private boolean isNextCall() {
362    return currentScannerCallable != null && currentScannerCallable.scannerId != -1
363      && !currentScannerCallable.renew && !currentScannerCallable.closed;
364  }
366  private void addCallsForOtherReplicas(
367    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max,
368    int rpcTimeout) {
370    for (int id = min; id <= max; id++) {
371      if (currentScannerCallable.id == id) {
372        continue; // this was already scheduled earlier
373      }
374      ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
375      setStartRowForReplicaCallable(s);
376      outstandingCallables.add(s);
377      RetryingRPC retryingOnReplica = new RetryingRPC(s);
378      cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id);
379    }
380  }
382  /**
383   * Set the start row for the replica callable based on the state of the last result received.
384   * @param callable The callable to set the start row on
385   */
386  private void setStartRowForReplicaCallable(ScannerCallable callable) {
387    if (this.lastResult == null || callable == null) {
388      return;
389    }
390    // 1. The last result was a partial result which means we have not received all of the cells
391    // for this row. Thus, use the last result's row as the start row. If a replica switch
392    // occurs, the scanner will ensure that any accumulated partial results are cleared,
393    // and the scan can resume from this row.
394    // 2. The last result was not a partial result which means it contained all of the cells for
395    // that row (we no longer need any information from it). Set the start row to the next
396    // closest row that could be seen.
397    callable.getScan().withStartRow(this.lastResult.getRow(),
398      this.lastResult.mayHaveMoreCellsInRow());
399  }
401  boolean isAnyRPCcancelled() {
402    return someRPCcancelled;
403  }
405  class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
406    final ScannerCallable callable;
407    RpcRetryingCaller<Result[]> caller;
408    private volatile boolean cancelled = false;
410    RetryingRPC(ScannerCallable callable) {
411      this.callable = callable;
412      // For the Consistency.STRONG (default case), we reuse the caller
413      // to keep compatibility with what is done in the past
414      // For the Consistency.TIMELINE case, we can't reuse the caller
415      // since we could be making parallel RPCs (caller.callWithRetries is synchronized
416      // and we can't invoke it multiple times at the same time)
417      this.caller = ScannerCallableWithReplicas.this.caller;
418      if (scan.getConsistency() == Consistency.TIMELINE) {
419        final ConnectionConfiguration connectionConfig = cConnection != null
420          ? cConnection.getConnectionConfiguration()
421          : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
422        this.caller =
423          RpcRetryingCallerFactory
424            .instantiate(ScannerCallableWithReplicas.this.conf, connectionConfig,
425              cConnection == null ? null : cConnection.getConnectionMetrics())
426            .<Result[]> newCaller();
427      }
428    }
430    @Override
431    public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
432      // since the retries is done within the ResultBoundedCompletionService,
433      // we don't invoke callWithRetries here
434      if (cancelled) {
435        return null;
436      }
437      Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
438      return new Pair<>(res, this.callable);
439    }
441    @Override
442    public void prepare(boolean reload) throws IOException {
443      if (cancelled) return;
445      if (Thread.interrupted()) {
446        throw new InterruptedIOException();
447      }
449      callable.prepare(reload);
450    }
452    @Override
453    public void throwable(Throwable t, boolean retrying) {
454      callable.throwable(t, retrying);
455    }
457    @Override
458    public String getExceptionMessageAdditionalDetail() {
459      return callable.getExceptionMessageAdditionalDetail();
460    }
462    @Override
463    public long sleep(long pause, int tries) {
464      return callable.sleep(pause, tries);
465    }
467    @Override
468    public void cancel() {
469      cancelled = true;
470      caller.cancel();
471      if (callable.getRpcController() != null) {
472        callable.getRpcController().startCancel();
473      }
474      someRPCcancelled = true;
475    }
477    @Override
478    public boolean isCancelled() {
479      return cancelled;
480    }
481  }
483  @Override
484  public void prepare(boolean reload) throws IOException {
485  }
487  @Override
488  public void throwable(Throwable t, boolean retrying) {
489    currentScannerCallable.throwable(t, retrying);
490  }
492  @Override
493  public String getExceptionMessageAdditionalDetail() {
494    return currentScannerCallable.getExceptionMessageAdditionalDetail();
495  }
497  @Override
498  public long sleep(long pause, int tries) {
499    return currentScannerCallable.sleep(pause, tries);
500  }