001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 025 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.Map; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HBaseIOException; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.NotServingRegionException; 036import org.apache.hadoop.hbase.RegionLocations; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.TableNotEnabledException; 040import org.apache.hadoop.hbase.UnknownScannerException; 041import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 042import org.apache.hadoop.hbase.exceptions.ScannerResetException; 043import org.apache.hadoop.hbase.ipc.HBaseRpcController; 044import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 052import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 053import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 056 057/** 058 * Scanner operations such as create, next, etc. Used by {@link ResultScanner}s made by 059 * {@link Table}. Passed to a retrying caller such as {@link RpcRetryingCaller} so fails are 060 * retried. 061 */ 062@InterfaceAudience.Private 063public class ScannerCallable extends ClientServiceCallable<Result[]> { 064 public static final String LOG_SCANNER_LATENCY_CUTOFF = "hbase.client.log.scanner.latency.cutoff"; 065 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; 066 067 // Keeping LOG public as it is being used in TestScannerHeartbeatMessages 068 public static final Logger LOG = LoggerFactory.getLogger(ScannerCallable.class); 069 protected long scannerId = -1L; 070 protected boolean instantiated = false; 071 protected boolean closed = false; 072 protected boolean renew = false; 073 protected final Scan scan; 074 private int caching = 1; 075 protected ScanMetrics scanMetrics; 076 private boolean logScannerActivity = false; 077 private int logCutOffLatency = 1000; 078 protected final int id; 079 080 enum MoreResults { 081 YES, 082 NO, 083 UNKNOWN 084 } 085 086 private MoreResults moreResultsInRegion; 087 private MoreResults moreResultsForScan; 088 089 /** 090 * Saves whether or not the most recent response from the server was a heartbeat message. 091 * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} 092 */ 093 protected boolean heartbeatMessage = false; 094 095 protected Cursor cursor; 096 097 // indicate if it is a remote server call 098 protected boolean isRegionServerRemote = true; 099 private long nextCallSeq = 0; 100 protected final RpcControllerFactory rpcControllerFactory; 101 102 /** 103 * @param connection which connection 104 * @param tableName table callable is on 105 * @param scan the scan to execute 106 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't 107 * collect metrics 108 * @param rpcControllerFactory factory to use when creating 109 * {@link com.google.protobuf.RpcController} 110 */ 111 public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, 112 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id, 113 Map<String, byte[]> requestAttributes) { 114 super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), 115 scan.getPriority(), requestAttributes); 116 this.id = id; 117 this.scan = scan; 118 this.scanMetrics = scanMetrics; 119 Configuration conf = connection.getConfiguration(); 120 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); 121 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); 122 this.rpcControllerFactory = rpcControllerFactory; 123 } 124 125 protected final HRegionLocation getLocationForReplica(RegionLocations locs) 126 throws HBaseIOException { 127 HRegionLocation loc = id < locs.size() ? locs.getRegionLocation(id) : null; 128 if (loc == null || loc.getServerName() == null) { 129 // With this exception, there will be a retry. The location can be null for a replica 130 // when the table is created or after a split. 131 throw new HBaseIOException("There is no location for replica id #" + id); 132 } 133 return loc; 134 } 135 136 /** 137 * Fetch region locations for the row. Since this is for prepare, we always useCache. This is 138 * because we can be sure that RpcRetryingCaller will have cleared the cache in error handling if 139 * this is a retry. 140 * @param row the row to look up region location for 141 */ 142 protected final RegionLocations getRegionLocationsForPrepare(byte[] row) throws IOException { 143 // always use cache, because cache will have been cleared if necessary 144 // in the try/catch before retrying 145 return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(), 146 getTableName(), row); 147 } 148 149 /** 150 * @param reload force reload of server location 151 */ 152 @Override 153 public void prepare(boolean reload) throws IOException { 154 if (Thread.interrupted()) { 155 throw new InterruptedIOException(); 156 } 157 158 if ( 159 reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) 160 && getConnection().isTableDisabled(getTableName()) 161 ) { 162 throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); 163 } 164 165 RegionLocations rl = getRegionLocationsForPrepare(getRow()); 166 location = getLocationForReplica(rl); 167 ServerName dest = location.getServerName(); 168 setStub(super.getConnection().getClient(dest)); 169 if (!instantiated || reload) { 170 checkIfRegionServerIsRemote(); 171 instantiated = true; 172 } 173 cursor = null; 174 // check how often we retry. 175 if (reload) { 176 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); 177 } 178 } 179 180 /** 181 * compare the local machine hostname with region server's hostname to decide if hbase client 182 * connects to a remote region server 183 */ 184 protected void checkIfRegionServerIsRemote() { 185 isRegionServerRemote = isRemote(getLocation().getHostname()); 186 } 187 188 private ScanResponse next() throws IOException { 189 // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server 190 setHeartbeatMessage(false); 191 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 192 ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, 193 this.scanMetrics != null, renew, scan.getLimit()); 194 try { 195 ScanResponse response = getStub().scan(getRpcController(), request); 196 nextCallSeq++; 197 return response; 198 } catch (Exception e) { 199 IOException ioe = ProtobufUtil.handleRemoteException(e); 200 if (logScannerActivity) { 201 LOG.info( 202 "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(), 203 e); 204 } 205 if (logScannerActivity) { 206 if (ioe instanceof UnknownScannerException) { 207 try { 208 HRegionLocation location = 209 getConnection().relocateRegion(getTableName(), scan.getStartRow()); 210 LOG.info("Scanner=" + scannerId + " expired, current region location is " 211 + location.toString()); 212 } catch (Throwable t) { 213 LOG.info("Failed to relocate region", t); 214 } 215 } else if (ioe instanceof ScannerResetException) { 216 LOG.info("Scanner=" + scannerId + " has received an exception, and the server " 217 + "asked us to reset the scanner state.", ioe); 218 } 219 } 220 // The below convertion of exceptions into DoNotRetryExceptions is a little strange. 221 // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want 222 // ServerCallable#withRetries to just retry when it gets these exceptions. In here in 223 // a scan when doing a next in particular, we want to break out and get the scanner to 224 // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, 225 // yeah and hard to follow and in need of a refactor). 226 if (ioe instanceof NotServingRegionException) { 227 // Throw a DNRE so that we break out of cycle of calling NSRE 228 // when what we need is to open scanner against new location. 229 // Attach NSRE to signal client that it needs to re-setup scanner. 230 if (this.scanMetrics != null) { 231 this.scanMetrics.countOfNSRE.incrementAndGet(); 232 } 233 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); 234 } else if (ioe instanceof RegionServerStoppedException) { 235 // Throw a DNRE so that we break out of cycle of the retries and instead go and 236 // open scanner against new location. 237 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); 238 } else { 239 // The outer layers will retry 240 throw ioe; 241 } 242 } 243 } 244 245 private void setAlreadyClosed() { 246 this.scannerId = -1L; 247 this.closed = true; 248 } 249 250 @Override 251 protected Result[] rpcCall() throws Exception { 252 if (Thread.interrupted()) { 253 throw new InterruptedIOException(); 254 } 255 if (closed) { 256 close(); 257 return null; 258 } 259 ScanResponse response; 260 if (this.scannerId == -1L) { 261 response = openScanner(); 262 } else { 263 response = next(); 264 } 265 long timestamp = EnvironmentEdgeManager.currentTime(); 266 boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); 267 setHeartbeatMessage(isHeartBeat); 268 if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { 269 cursor = ProtobufUtil.toCursor(response.getCursor()); 270 } 271 Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); 272 if (logScannerActivity) { 273 long now = EnvironmentEdgeManager.currentTime(); 274 if (now - timestamp > logCutOffLatency) { 275 int rows = rrs == null ? 0 : rrs.length; 276 LOG.info( 277 "Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); 278 } 279 } 280 updateServerSideMetrics(scanMetrics, response); 281 // moreResults is only used for the case where a filter exhausts all elements 282 if (response.hasMoreResults()) { 283 if (response.getMoreResults()) { 284 setMoreResultsForScan(MoreResults.YES); 285 } else { 286 setMoreResultsForScan(MoreResults.NO); 287 setAlreadyClosed(); 288 } 289 } else { 290 setMoreResultsForScan(MoreResults.UNKNOWN); 291 } 292 if (response.hasMoreResultsInRegion()) { 293 if (response.getMoreResultsInRegion()) { 294 setMoreResultsInRegion(MoreResults.YES); 295 } else { 296 setMoreResultsInRegion(MoreResults.NO); 297 setAlreadyClosed(); 298 } 299 } else { 300 setMoreResultsInRegion(MoreResults.UNKNOWN); 301 } 302 updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); 303 return rrs; 304 } 305 306 /** 307 * @return true when the most recent RPC response indicated that the response was a heartbeat 308 * message. Heartbeat messages are sent back from the server when the processing of the 309 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid 310 * timeouts during long running scan operations. 311 */ 312 boolean isHeartbeatMessage() { 313 return heartbeatMessage; 314 } 315 316 public Cursor getCursor() { 317 return cursor; 318 } 319 320 private void setHeartbeatMessage(boolean heartbeatMessage) { 321 this.heartbeatMessage = heartbeatMessage; 322 } 323 324 private void close() { 325 if (this.scannerId == -1L) { 326 return; 327 } 328 try { 329 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 330 ScanRequest request = 331 RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); 332 HBaseRpcController controller = rpcControllerFactory.newController(); 333 334 // Set fields from the original controller onto the close-specific controller 335 // We set the timeout and the priority -- we will overwrite the priority to HIGH 336 // below, but the controller will take whichever is highest. 337 if (getRpcController() instanceof HBaseRpcController) { 338 HBaseRpcController original = (HBaseRpcController) getRpcController(); 339 controller.setPriority(original.getPriority()); 340 if (original.hasCallTimeout()) { 341 controller.setCallTimeout(original.getCallTimeout()); 342 } 343 } 344 controller.setPriority(HConstants.HIGH_QOS); 345 346 try { 347 getStub().scan(controller, request); 348 } catch (Exception e) { 349 throw ProtobufUtil.handleRemoteException(e); 350 } 351 } catch (IOException e) { 352 TableName table = getTableName(); 353 String tableDetails = (table == null) ? "" : (" on table: " + table.getNameAsString()); 354 LOG.warn( 355 "Ignore, probably already closed. Current scan: " + getScan().toString() + tableDetails, e); 356 } 357 this.scannerId = -1L; 358 } 359 360 private ScanResponse openScanner() throws IOException { 361 incRPCCallsMetrics(scanMetrics, isRegionServerRemote); 362 ScanRequest request = RequestConverter.buildScanRequest( 363 getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); 364 try { 365 ScanResponse response = getStub().scan(getRpcController(), request); 366 long id = response.getScannerId(); 367 if (logScannerActivity) { 368 LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region " 369 + getLocation().toString()); 370 } 371 if (response.hasMvccReadPoint()) { 372 this.scan.setMvccReadPoint(response.getMvccReadPoint()); 373 } 374 this.scannerId = id; 375 return response; 376 } catch (Exception e) { 377 throw ProtobufUtil.handleRemoteException(e); 378 } 379 } 380 381 protected Scan getScan() { 382 return scan; 383 } 384 385 /** 386 * Call this when the next invocation of call should close the scanner 387 */ 388 public void setClose() { 389 this.closed = true; 390 } 391 392 /** 393 * Indicate whether we make a call only to renew the lease, but without affected the scanner in 394 * any other way. 395 * @param val true if only the lease should be renewed 396 */ 397 public void setRenew(boolean val) { 398 this.renew = val; 399 } 400 401 /** Returns the HRegionInfo for the current region */ 402 @Override 403 public HRegionInfo getHRegionInfo() { 404 if (!instantiated) { 405 return null; 406 } 407 return getLocation().getRegionInfo(); 408 } 409 410 /** 411 * Get the number of rows that will be fetched on next 412 * @return the number of rows for caching 413 */ 414 public int getCaching() { 415 return caching; 416 } 417 418 /** 419 * Set the number of rows that will be fetched on next 420 * @param caching the number of rows for caching 421 */ 422 public void setCaching(int caching) { 423 this.caching = caching; 424 } 425 426 public ScannerCallable getScannerCallableForReplica(int id) { 427 ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(), this.getScan(), 428 this.scanMetrics, this.rpcControllerFactory, id, requestAttributes); 429 s.setCaching(this.caching); 430 return s; 431 } 432 433 /** 434 * Should the client attempt to fetch more results from this region 435 */ 436 MoreResults moreResultsInRegion() { 437 return moreResultsInRegion; 438 } 439 440 void setMoreResultsInRegion(MoreResults moreResults) { 441 this.moreResultsInRegion = moreResults; 442 } 443 444 /** 445 * Should the client attempt to fetch more results for the whole scan. 446 */ 447 MoreResults moreResultsForScan() { 448 return moreResultsForScan; 449 } 450 451 void setMoreResultsForScan(MoreResults moreResults) { 452 this.moreResultsForScan = moreResults; 453 } 454}