001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashSet;
023import java.util.Set;
024import java.util.concurrent.Callable;
025import java.util.concurrent.CancellationException;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Future;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * This class has the logic for handling scanners for regions with and without replicas. 1. A scan
044 * is attempted on the default (primary) region, or a specific region. 2. The scanner sends all the
045 * RPCs to the default/specific region until it is done, or, there is a timeout on the
046 * default/specific region (a timeout of zero is disallowed). 3. If there is a timeout in (2) above,
047 * scanner(s) is opened on the non-default replica(s) only for Consistency.TIMELINE without specific
048 * replica id specified. 4. The results from the first successful scanner are taken, and it is
049 * stored which server returned the results. 5. The next RPCs are done on the above stored server
050 * until it is done or there is a timeout, in which case, the other replicas are queried (as in (3)
051 * above).
052 */
053@InterfaceAudience.Private
054class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
055  private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
056  volatile ScannerCallable currentScannerCallable;
057  AtomicBoolean replicaSwitched = new AtomicBoolean(false);
058  private final ClusterConnection cConnection;
059  protected final ExecutorService pool;
060  private final boolean useScannerTimeoutForNextCalls;
061  protected final int timeBeforeReplicas;
062  private final Scan scan;
063  private final int retries;
064  private Result lastResult;
065  private final RpcRetryingCaller<Result[]> caller;
066  private final TableName tableName;
067  private Configuration conf;
068  private final int scannerTimeout;
069  private final int readRpcTimeout;
070  private Set<ScannerCallable> outstandingCallables = new HashSet<>();
071  private boolean someRPCcancelled = false; // required for testing purposes only
072  private int regionReplication = 0;
073
074  public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
075    ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
076    int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls,
077    int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
078    this.currentScannerCallable = baseCallable;
079    this.cConnection = cConnection;
080    this.pool = pool;
081    this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls;
082    if (timeBeforeReplicas < 0) {
083      throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
084    }
085    this.timeBeforeReplicas = timeBeforeReplicas;
086    this.scan = scan;
087    this.retries = retries;
088    this.tableName = tableName;
089    this.conf = conf;
090    this.readRpcTimeout = readRpcTimeout;
091    this.scannerTimeout = scannerTimeout;
092    this.caller = caller;
093  }
094
095  public void setClose() {
096    if (currentScannerCallable != null) {
097      currentScannerCallable.setClose();
098    } else {
099      LOG.warn("Calling close on ScannerCallable reference that is already null, "
100        + "which shouldn't happen.");
101    }
102  }
103
104  public void setRenew(boolean val) {
105    currentScannerCallable.setRenew(val);
106  }
107
108  public void setCaching(int caching) {
109    currentScannerCallable.setCaching(caching);
110  }
111
112  public int getCaching() {
113    return currentScannerCallable.getCaching();
114  }
115
116  public HRegionInfo getHRegionInfo() {
117    return currentScannerCallable.getHRegionInfo();
118  }
119
120  public MoreResults moreResultsInRegion() {
121    return currentScannerCallable.moreResultsInRegion();
122  }
123
124  public MoreResults moreResultsForScan() {
125    return currentScannerCallable.moreResultsForScan();
126  }
127
128  @Override
129  public Result[] call(int timeout) throws IOException {
130    // If the active replica callable was closed somewhere, invoke the RPC to
131    // really close it. In the case of regular scanners, this applies. We make couple
132    // of RPCs to a RegionServer, and when that region is exhausted, we set
133    // the closed flag. Then an RPC is required to actually close the scanner.
134    if (currentScannerCallable != null && currentScannerCallable.closed) {
135      // For closing we target that exact scanner (and not do replica fallback like in
136      // the case of normal reads)
137      if (LOG.isTraceEnabled()) {
138        LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
139      }
140      Result[] r = currentScannerCallable.call(timeout);
141      currentScannerCallable = null;
142      return r;
143    } else if (currentScannerCallable == null) {
144      LOG.warn("Another call received, but our ScannerCallable is already null. "
145        + "This shouldn't happen, but there's not much to do, so logging and returning null.");
146      return null;
147    }
148    // We need to do the following:
149    // 1. When a scan goes out to a certain replica (default or not), we need to
150    // continue to hit that until there is a failure. So store the last successfully invoked
151    // replica
152    // 2. We should close the "losing" scanners (scanners other than the ones we hear back
153    // from first)
154    //
155    // Since RegionReplication is a table attribute, it wont change as long as table is enabled,
156    // it just needs to be set once.
157
158    if (regionReplication <= 0) {
159      RegionLocations rl = null;
160      try {
161        rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
162          RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
163          currentScannerCallable.getRow());
164      } catch (RetriesExhaustedException | DoNotRetryIOException e) {
165        // We cannot get the primary replica region location, it is possible that the region server
166        // hosting meta table is down, it needs to proceed to try cached replicas directly.
167        if (cConnection instanceof ConnectionImplementation) {
168          rl = ((ConnectionImplementation) cConnection).getCachedLocation(tableName,
169            currentScannerCallable.getRow());
170          if (rl == null) {
171            throw e;
172          }
173        } else {
174          // For completeness
175          throw e;
176        }
177      }
178      regionReplication = rl.size();
179    }
180    // allocate a bounded-completion pool of some multiple of number of replicas.
181    // We want to accommodate some RPCs for redundant replica scans (but are still in progress)
182    final ConnectionConfiguration connectionConfig = cConnection != null
183      ? cConnection.getConnectionConfiguration()
184      : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
185    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
186      new ResultBoundedCompletionService<>(
187        RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf,
188          connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()),
189        pool, regionReplication * 5);
190
191    AtomicBoolean done = new AtomicBoolean(false);
192    // make sure we use the same rpcTimeout for current and other replicas
193    int rpcTimeoutForCall = getRpcTimeout();
194
195    replicaSwitched.set(false);
196    // submit call for the primary replica or user specified replica
197    addCallsForCurrentReplica(cs, rpcTimeoutForCall);
198    int startIndex = 0;
199
200    try {
201      // wait for the timeout to see whether the primary responds back
202      Future<Pair<Result[], ScannerCallable>> f =
203        cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
204      if (f != null) {
205        // After poll, if f is not null, there must be a completed task
206        Pair<Result[], ScannerCallable> r = f.get();
207        if (r != null && r.getSecond() != null) {
208          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
209        }
210        return r == null ? null : r.getFirst(); // great we got a response
211      }
212    } catch (ExecutionException e) {
213      // We ignore the ExecutionException and continue with the replicas
214      if (LOG.isDebugEnabled()) {
215        LOG.debug("Scan with primary region returns " + e.getCause());
216      }
217
218      // If rl's size is 1 or scan's consitency is strong, or scan is over specific replica,
219      // it needs to throw out the exception from the primary replica
220      if (
221        regionReplication == 1 || scan.getConsistency() == Consistency.STRONG
222          || scan.getReplicaId() >= 0
223      ) {
224        // Rethrow the first exception
225        RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
226      }
227      startIndex = 1;
228    } catch (CancellationException e) {
229      throw new InterruptedIOException(e.getMessage());
230    } catch (InterruptedException e) {
231      throw new InterruptedIOException(e.getMessage());
232    }
233
234    // submit call for the all of the secondaries at once
235    int endIndex = regionReplication;
236    if (scan.getConsistency() == Consistency.STRONG || scan.getReplicaId() >= 0) {
237      // When scan's consistency is strong or scan is over specific replica region, do not send to
238      // the secondaries
239      endIndex = 1;
240    } else {
241      // TODO: this may be an overkill for large region replication
242      addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall);
243    }
244
245    try {
246      Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
247        TimeUnit.MILLISECONDS, startIndex, endIndex);
248
249      if (f == null) {
250        throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
251      }
252      Pair<Result[], ScannerCallable> r = f.get();
253
254      if (r != null && r.getSecond() != null) {
255        updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
256      }
257      return r == null ? null : r.getFirst(); // great we got an answer
258
259    } catch (ExecutionException e) {
260      RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
261    } catch (CancellationException e) {
262      throw new InterruptedIOException(e.getMessage());
263    } catch (InterruptedException e) {
264      throw new InterruptedIOException(e.getMessage());
265    } finally {
266      // We get there because we were interrupted or because one or more of the
267      // calls succeeded or failed. In all case, we stop all our tasks.
268      cs.cancelAll();
269    }
270    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
271    throw new IOException("Imposible? Arrive at an unreachable line...");
272  }
273
274  @SuppressWarnings("FutureReturnValueIgnored")
275  private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
276    AtomicBoolean done, ExecutorService pool) {
277    if (done.compareAndSet(false, true)) {
278      if (currentScannerCallable != scanner) replicaSwitched.set(true);
279      currentScannerCallable = scanner;
280      // store where to start the replica scanner from if we need to.
281      if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
282      if (LOG.isTraceEnabled()) {
283        LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId
284          + " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
285      }
286      // close all outstanding replica scanners but the one we heard back from
287      outstandingCallables.remove(scanner);
288      for (ScannerCallable s : outstandingCallables) {
289        if (LOG.isTraceEnabled()) {
290          LOG.trace("Closing scanner id=" + s.scannerId + ", replica="
291            + s.getHRegionInfo().getRegionId() + " because slow and replica="
292            + this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
293        }
294        // Submit the "close" to the pool since this might take time, and we don't
295        // want to wait for the "close" to happen yet. The "wait" will happen when
296        // the table is closed (when the awaitTermination of the underlying pool is called)
297        s.setClose();
298        final RetryingRPC r = new RetryingRPC(s);
299        pool.submit(new Callable<Void>() {
300          @Override
301          public Void call() throws Exception {
302            r.call(scannerTimeout);
303            return null;
304          }
305        });
306      }
307      // now clear outstandingCallables since we scheduled a close for all the contained scanners
308      outstandingCallables.clear();
309    }
310  }
311
312  /**
313   * When a scanner switches in the middle of scanning (the 'next' call fails for example), the
314   * upper layer {@link ClientScanner} needs to know
315   */
316  public boolean switchedToADifferentReplica() {
317    return replicaSwitched.get();
318  }
319
320  /**
321   * Returns true when the most recent RPC response indicated that the response was a heartbeat
322   * message. Heartbeat messages are sent back from the server when the processing of the scan
323   * request exceeds a certain time threshold. Heartbeats allow the server to avoid timeouts during
324   * long running scan operations.
325   */
326  public boolean isHeartbeatMessage() {
327    return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
328  }
329
330  public Cursor getCursor() {
331    return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
332  }
333
334  private void addCallsForCurrentReplica(
335    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int rpcTimeout) {
336    RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
337    outstandingCallables.add(currentScannerCallable);
338    cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id);
339  }
340
341  /**
342   * As we have a call sequence for scan, it is useless to have a different rpc timeout which is
343   * less than the scan timeout. If the server does not respond in time(usually this will not happen
344   * as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the
345   * next request and the only way to fix this is to close the scanner and open a new one.
346   * <p>
347   * The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If
348   * using legacy behavior, we always use that.
349   * <p>
350   * If new behavior is enabled, we determine the rpc timeout to use based on whether the scanner is
351   * open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout.
352   */
353  private int getRpcTimeout() {
354    if (useScannerTimeoutForNextCalls) {
355      return isNextCall() ? scannerTimeout : readRpcTimeout;
356    } else {
357      return readRpcTimeout;
358    }
359  }
360
361  private boolean isNextCall() {
362    return currentScannerCallable != null && currentScannerCallable.scannerId != -1
363      && !currentScannerCallable.renew && !currentScannerCallable.closed;
364  }
365
366  private void addCallsForOtherReplicas(
367    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max,
368    int rpcTimeout) {
369
370    for (int id = min; id <= max; id++) {
371      if (currentScannerCallable.id == id) {
372        continue; // this was already scheduled earlier
373      }
374      ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
375      setStartRowForReplicaCallable(s);
376      outstandingCallables.add(s);
377      RetryingRPC retryingOnReplica = new RetryingRPC(s);
378      cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id);
379    }
380  }
381
382  /**
383   * Set the start row for the replica callable based on the state of the last result received.
384   * @param callable The callable to set the start row on
385   */
386  private void setStartRowForReplicaCallable(ScannerCallable callable) {
387    if (this.lastResult == null || callable == null) {
388      return;
389    }
390    // 1. The last result was a partial result which means we have not received all of the cells
391    // for this row. Thus, use the last result's row as the start row. If a replica switch
392    // occurs, the scanner will ensure that any accumulated partial results are cleared,
393    // and the scan can resume from this row.
394    // 2. The last result was not a partial result which means it contained all of the cells for
395    // that row (we no longer need any information from it). Set the start row to the next
396    // closest row that could be seen.
397    callable.getScan().withStartRow(this.lastResult.getRow(),
398      this.lastResult.mayHaveMoreCellsInRow());
399  }
400
401  boolean isAnyRPCcancelled() {
402    return someRPCcancelled;
403  }
404
405  class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
406    final ScannerCallable callable;
407    RpcRetryingCaller<Result[]> caller;
408    private volatile boolean cancelled = false;
409
410    RetryingRPC(ScannerCallable callable) {
411      this.callable = callable;
412      // For the Consistency.STRONG (default case), we reuse the caller
413      // to keep compatibility with what is done in the past
414      // For the Consistency.TIMELINE case, we can't reuse the caller
415      // since we could be making parallel RPCs (caller.callWithRetries is synchronized
416      // and we can't invoke it multiple times at the same time)
417      this.caller = ScannerCallableWithReplicas.this.caller;
418      if (scan.getConsistency() == Consistency.TIMELINE) {
419        final ConnectionConfiguration connectionConfig = cConnection != null
420          ? cConnection.getConnectionConfiguration()
421          : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
422        this.caller =
423          RpcRetryingCallerFactory
424            .instantiate(ScannerCallableWithReplicas.this.conf, connectionConfig,
425              cConnection == null ? null : cConnection.getConnectionMetrics())
426            .<Result[]> newCaller();
427      }
428    }
429
430    @Override
431    public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
432      // since the retries is done within the ResultBoundedCompletionService,
433      // we don't invoke callWithRetries here
434      if (cancelled) {
435        return null;
436      }
437      Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
438      return new Pair<>(res, this.callable);
439    }
440
441    @Override
442    public void prepare(boolean reload) throws IOException {
443      if (cancelled) return;
444
445      if (Thread.interrupted()) {
446        throw new InterruptedIOException();
447      }
448
449      callable.prepare(reload);
450    }
451
452    @Override
453    public void throwable(Throwable t, boolean retrying) {
454      callable.throwable(t, retrying);
455    }
456
457    @Override
458    public String getExceptionMessageAdditionalDetail() {
459      return callable.getExceptionMessageAdditionalDetail();
460    }
461
462    @Override
463    public long sleep(long pause, int tries) {
464      return callable.sleep(pause, tries);
465    }
466
467    @Override
468    public void cancel() {
469      cancelled = true;
470      caller.cancel();
471      if (callable.getRpcController() != null) {
472        callable.getRpcController().startCancel();
473      }
474      someRPCcancelled = true;
475    }
476
477    @Override
478    public boolean isCancelled() {
479      return cancelled;
480    }
481  }
482
483  @Override
484  public void prepare(boolean reload) throws IOException {
485  }
486
487  @Override
488  public void throwable(Throwable t, boolean retrying) {
489    currentScannerCallable.throwable(t, retrying);
490  }
491
492  @Override
493  public String getExceptionMessageAdditionalDetail() {
494    return currentScannerCallable.getExceptionMessageAdditionalDetail();
495  }
496
497  @Override
498  public long sleep(long pause, int tries) {
499    return currentScannerCallable.sleep(pause, tries);
500  }
501}