001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
025
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.Map;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HBaseIOException;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.NotServingRegionException;
036import org.apache.hadoop.hbase.RegionLocations;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNotEnabledException;
040import org.apache.hadoop.hbase.UnknownScannerException;
041import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
042import org.apache.hadoop.hbase.exceptions.ScannerResetException;
043import org.apache.hadoop.hbase.ipc.HBaseRpcController;
044import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
053import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
056
057/**
058 * Scanner operations such as create, next, etc. Used by {@link ResultScanner}s made by
059 * {@link Table}. Passed to a retrying caller such as {@link RpcRetryingCaller} so fails are
060 * retried.
061 */
062@InterfaceAudience.Private
063public class ScannerCallable extends ClientServiceCallable<Result[]> {
064  public static final String LOG_SCANNER_LATENCY_CUTOFF = "hbase.client.log.scanner.latency.cutoff";
065  public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
066
067  // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
068  public static final Logger LOG = LoggerFactory.getLogger(ScannerCallable.class);
069  protected long scannerId = -1L;
070  protected boolean instantiated = false;
071  protected boolean closed = false;
072  protected boolean renew = false;
073  protected final Scan scan;
074  private int caching = 1;
075  protected ScanMetrics scanMetrics;
076  private boolean logScannerActivity = false;
077  private int logCutOffLatency = 1000;
078  protected final int id;
079
080  enum MoreResults {
081    YES,
082    NO,
083    UNKNOWN
084  }
085
086  private MoreResults moreResultsInRegion;
087  private MoreResults moreResultsForScan;
088
089  /**
090   * Saves whether or not the most recent response from the server was a heartbeat message.
091   * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
092   */
093  protected boolean heartbeatMessage = false;
094
095  protected Cursor cursor;
096
097  // indicate if it is a remote server call
098  protected boolean isRegionServerRemote = true;
099  private long nextCallSeq = 0;
100  protected final RpcControllerFactory rpcControllerFactory;
101
102  /**
103   * @param connection           which connection
104   * @param tableName            table callable is on
105   * @param scan                 the scan to execute
106   * @param scanMetrics          the ScanMetrics to used, if it is null, ScannerCallable won't
107   *                             collect metrics
108   * @param rpcControllerFactory factory to use when creating
109   *                             {@link com.google.protobuf.RpcController}
110   */
111  public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
112    ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id,
113    Map<String, byte[]> requestAttributes) {
114    super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(),
115      scan.getPriority(), requestAttributes);
116    this.id = id;
117    this.scan = scan;
118    this.scanMetrics = scanMetrics;
119    Configuration conf = connection.getConfiguration();
120    logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
121    logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
122    this.rpcControllerFactory = rpcControllerFactory;
123  }
124
125  protected final HRegionLocation getLocationForReplica(RegionLocations locs)
126    throws HBaseIOException {
127    HRegionLocation loc = id < locs.size() ? locs.getRegionLocation(id) : null;
128    if (loc == null || loc.getServerName() == null) {
129      // With this exception, there will be a retry. The location can be null for a replica
130      // when the table is created or after a split.
131      throw new HBaseIOException("There is no location for replica id #" + id);
132    }
133    return loc;
134  }
135
136  /**
137   * Fetch region locations for the row. Since this is for prepare, we always useCache. This is
138   * because we can be sure that RpcRetryingCaller will have cleared the cache in error handling if
139   * this is a retry.
140   * @param row the row to look up region location for
141   */
142  protected final RegionLocations getRegionLocationsForPrepare(byte[] row) throws IOException {
143    // always use cache, because cache will have been cleared if necessary
144    // in the try/catch before retrying
145    return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(),
146      getTableName(), row);
147  }
148
149  /**
150   * @param reload force reload of server location
151   */
152  @Override
153  public void prepare(boolean reload) throws IOException {
154    if (Thread.interrupted()) {
155      throw new InterruptedIOException();
156    }
157
158    if (
159      reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
160        && getConnection().isTableDisabled(getTableName())
161    ) {
162      throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
163    }
164
165    RegionLocations rl = getRegionLocationsForPrepare(getRow());
166    location = getLocationForReplica(rl);
167    ServerName dest = location.getServerName();
168    setStub(super.getConnection().getClient(dest));
169    if (!instantiated || reload) {
170      checkIfRegionServerIsRemote();
171      instantiated = true;
172    }
173    cursor = null;
174    // check how often we retry.
175    if (reload) {
176      incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
177    }
178  }
179
180  /**
181   * compare the local machine hostname with region server's hostname to decide if hbase client
182   * connects to a remote region server
183   */
184  protected void checkIfRegionServerIsRemote() {
185    isRegionServerRemote = isRemote(getLocation().getHostname());
186  }
187
188  private ScanResponse next() throws IOException {
189    // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
190    setHeartbeatMessage(false);
191    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
192    ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
193      this.scanMetrics != null, renew, scan.getLimit());
194    try {
195      ScanResponse response = getStub().scan(getRpcController(), request);
196      nextCallSeq++;
197      return response;
198    } catch (Exception e) {
199      IOException ioe = ProtobufUtil.handleRemoteException(e);
200      if (logScannerActivity) {
201        LOG.info(
202          "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(),
203          e);
204      }
205      if (logScannerActivity) {
206        if (ioe instanceof UnknownScannerException) {
207          try {
208            HRegionLocation location =
209              getConnection().relocateRegion(getTableName(), scan.getStartRow());
210            LOG.info("Scanner=" + scannerId + " expired, current region location is "
211              + location.toString());
212          } catch (Throwable t) {
213            LOG.info("Failed to relocate region", t);
214          }
215        } else if (ioe instanceof ScannerResetException) {
216          LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
217            + "asked us to reset the scanner state.", ioe);
218        }
219      }
220      // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
221      // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want
222      // ServerCallable#withRetries to just retry when it gets these exceptions. In here in
223      // a scan when doing a next in particular, we want to break out and get the scanner to
224      // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly,
225      // yeah and hard to follow and in need of a refactor).
226      if (ioe instanceof NotServingRegionException) {
227        // Throw a DNRE so that we break out of cycle of calling NSRE
228        // when what we need is to open scanner against new location.
229        // Attach NSRE to signal client that it needs to re-setup scanner.
230        if (this.scanMetrics != null) {
231          this.scanMetrics.countOfNSRE.incrementAndGet();
232        }
233        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
234      } else if (ioe instanceof RegionServerStoppedException) {
235        // Throw a DNRE so that we break out of cycle of the retries and instead go and
236        // open scanner against new location.
237        throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
238      } else {
239        // The outer layers will retry
240        throw ioe;
241      }
242    }
243  }
244
245  private void setAlreadyClosed() {
246    this.scannerId = -1L;
247    this.closed = true;
248  }
249
250  @Override
251  protected Result[] rpcCall() throws Exception {
252    if (Thread.interrupted()) {
253      throw new InterruptedIOException();
254    }
255    if (closed) {
256      close();
257      return null;
258    }
259    ScanResponse response;
260    if (this.scannerId == -1L) {
261      response = openScanner();
262    } else {
263      response = next();
264    }
265    long timestamp = EnvironmentEdgeManager.currentTime();
266    boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage();
267    setHeartbeatMessage(isHeartBeat);
268    if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
269      cursor = ProtobufUtil.toCursor(response.getCursor());
270    }
271    Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
272    if (logScannerActivity) {
273      long now = EnvironmentEdgeManager.currentTime();
274      if (now - timestamp > logCutOffLatency) {
275        int rows = rrs == null ? 0 : rrs.length;
276        LOG.info(
277          "Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId);
278      }
279    }
280    updateServerSideMetrics(scanMetrics, response);
281    // moreResults is only used for the case where a filter exhausts all elements
282    if (response.hasMoreResults()) {
283      if (response.getMoreResults()) {
284        setMoreResultsForScan(MoreResults.YES);
285      } else {
286        setMoreResultsForScan(MoreResults.NO);
287        setAlreadyClosed();
288      }
289    } else {
290      setMoreResultsForScan(MoreResults.UNKNOWN);
291    }
292    if (response.hasMoreResultsInRegion()) {
293      if (response.getMoreResultsInRegion()) {
294        setMoreResultsInRegion(MoreResults.YES);
295      } else {
296        setMoreResultsInRegion(MoreResults.NO);
297        setAlreadyClosed();
298      }
299    } else {
300      setMoreResultsInRegion(MoreResults.UNKNOWN);
301    }
302    updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
303    return rrs;
304  }
305
306  /**
307   * @return true when the most recent RPC response indicated that the response was a heartbeat
308   *         message. Heartbeat messages are sent back from the server when the processing of the
309   *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
310   *         timeouts during long running scan operations.
311   */
312  boolean isHeartbeatMessage() {
313    return heartbeatMessage;
314  }
315
316  public Cursor getCursor() {
317    return cursor;
318  }
319
320  private void setHeartbeatMessage(boolean heartbeatMessage) {
321    this.heartbeatMessage = heartbeatMessage;
322  }
323
324  private void close() {
325    if (this.scannerId == -1L) {
326      return;
327    }
328    try {
329      incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
330      ScanRequest request =
331        RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
332      HBaseRpcController controller = rpcControllerFactory.newController();
333
334      // Set fields from the original controller onto the close-specific controller
335      // We set the timeout and the priority -- we will overwrite the priority to HIGH
336      // below, but the controller will take whichever is highest.
337      if (getRpcController() instanceof HBaseRpcController) {
338        HBaseRpcController original = (HBaseRpcController) getRpcController();
339        controller.setPriority(original.getPriority());
340        if (original.hasCallTimeout()) {
341          controller.setCallTimeout(original.getCallTimeout());
342        }
343      }
344      controller.setPriority(HConstants.HIGH_QOS);
345
346      try {
347        getStub().scan(controller, request);
348      } catch (Exception e) {
349        throw ProtobufUtil.handleRemoteException(e);
350      }
351    } catch (IOException e) {
352      TableName table = getTableName();
353      String tableDetails = (table == null) ? "" : (" on table: " + table.getNameAsString());
354      LOG.warn(
355        "Ignore, probably already closed. Current scan: " + getScan().toString() + tableDetails, e);
356    }
357    this.scannerId = -1L;
358  }
359
360  private ScanResponse openScanner() throws IOException {
361    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
362    ScanRequest request = RequestConverter.buildScanRequest(
363      getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
364    try {
365      ScanResponse response = getStub().scan(getRpcController(), request);
366      long id = response.getScannerId();
367      if (logScannerActivity) {
368        LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region "
369          + getLocation().toString());
370      }
371      if (response.hasMvccReadPoint()) {
372        this.scan.setMvccReadPoint(response.getMvccReadPoint());
373      }
374      this.scannerId = id;
375      return response;
376    } catch (Exception e) {
377      throw ProtobufUtil.handleRemoteException(e);
378    }
379  }
380
381  protected Scan getScan() {
382    return scan;
383  }
384
385  /**
386   * Call this when the next invocation of call should close the scanner
387   */
388  public void setClose() {
389    this.closed = true;
390  }
391
392  /**
393   * Indicate whether we make a call only to renew the lease, but without affected the scanner in
394   * any other way.
395   * @param val true if only the lease should be renewed
396   */
397  public void setRenew(boolean val) {
398    this.renew = val;
399  }
400
401  /** Returns the HRegionInfo for the current region */
402  @Override
403  public HRegionInfo getHRegionInfo() {
404    if (!instantiated) {
405      return null;
406    }
407    return getLocation().getRegionInfo();
408  }
409
410  /**
411   * Get the number of rows that will be fetched on next
412   * @return the number of rows for caching
413   */
414  public int getCaching() {
415    return caching;
416  }
417
418  /**
419   * Set the number of rows that will be fetched on next
420   * @param caching the number of rows for caching
421   */
422  public void setCaching(int caching) {
423    this.caching = caching;
424  }
425
426  public ScannerCallable getScannerCallableForReplica(int id) {
427    ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(), this.getScan(),
428      this.scanMetrics, this.rpcControllerFactory, id, requestAttributes);
429    s.setCaching(this.caching);
430    return s;
431  }
432
433  /**
434   * Should the client attempt to fetch more results from this region
435   */
436  MoreResults moreResultsInRegion() {
437    return moreResultsInRegion;
438  }
439
440  void setMoreResultsInRegion(MoreResults moreResults) {
441    this.moreResultsInRegion = moreResults;
442  }
443
444  /**
445   * Should the client attempt to fetch more results for the whole scan.
446   */
447  MoreResults moreResultsForScan() {
448    return moreResultsForScan;
449  }
450
451  void setMoreResultsForScan(MoreResults moreResults) {
452    this.moreResultsForScan = moreResults;
453  }
454}