001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.net.BindException; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.NavigableMap; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.LongAdder; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.hbase.CacheEvictionStats; 048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 049import org.apache.hadoop.hbase.Cell; 050import org.apache.hadoop.hbase.CellScanner; 051import org.apache.hadoop.hbase.CellUtil; 052import org.apache.hadoop.hbase.DoNotRetryIOException; 053import org.apache.hadoop.hbase.DroppedSnapshotException; 054import org.apache.hadoop.hbase.ExtendedCellScannable; 055import org.apache.hadoop.hbase.ExtendedCellScanner; 056import org.apache.hadoop.hbase.HBaseIOException; 057import org.apache.hadoop.hbase.HBaseRpcServicesBase; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.MultiActionResultTooLarge; 060import org.apache.hadoop.hbase.NotServingRegionException; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.RegionTooBusyException; 063import org.apache.hadoop.hbase.Server; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.UnknownScannerException; 067import org.apache.hadoop.hbase.client.Append; 068import org.apache.hadoop.hbase.client.CheckAndMutate; 069import org.apache.hadoop.hbase.client.CheckAndMutateResult; 070import org.apache.hadoop.hbase.client.ClientInternalHelper; 071import org.apache.hadoop.hbase.client.Delete; 072import org.apache.hadoop.hbase.client.Durability; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Increment; 075import org.apache.hadoop.hbase.client.Mutation; 076import org.apache.hadoop.hbase.client.OperationWithAttributes; 077import org.apache.hadoop.hbase.client.Put; 078import org.apache.hadoop.hbase.client.QueryMetrics; 079import org.apache.hadoop.hbase.client.RegionInfo; 080import org.apache.hadoop.hbase.client.RegionReplicaUtil; 081import org.apache.hadoop.hbase.client.Result; 082import org.apache.hadoop.hbase.client.Row; 083import org.apache.hadoop.hbase.client.Scan; 084import org.apache.hadoop.hbase.client.TableDescriptor; 085import org.apache.hadoop.hbase.client.VersionInfoUtil; 086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 088import org.apache.hadoop.hbase.exceptions.ScannerResetException; 089import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 090import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 091import org.apache.hadoop.hbase.io.ByteBuffAllocator; 092import org.apache.hadoop.hbase.io.hfile.BlockCache; 093import org.apache.hadoop.hbase.ipc.HBaseRpcController; 094import org.apache.hadoop.hbase.ipc.PriorityFunction; 095import org.apache.hadoop.hbase.ipc.QosPriority; 096import org.apache.hadoop.hbase.ipc.RpcCall; 097import org.apache.hadoop.hbase.ipc.RpcCallContext; 098import org.apache.hadoop.hbase.ipc.RpcCallback; 099import org.apache.hadoop.hbase.ipc.RpcServer; 100import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 101import org.apache.hadoop.hbase.ipc.RpcServerFactory; 102import org.apache.hadoop.hbase.ipc.RpcServerInterface; 103import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 104import org.apache.hadoop.hbase.ipc.ServerRpcController; 105import org.apache.hadoop.hbase.net.Address; 106import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 107import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 108import org.apache.hadoop.hbase.quotas.OperationQuota; 109import org.apache.hadoop.hbase.quotas.QuotaUtil; 110import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 111import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 112import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 113import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 114import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 115import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 116import org.apache.hadoop.hbase.regionserver.Region.Operation; 117import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 118import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 119import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 120import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 121import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 122import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 123import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 124import org.apache.hadoop.hbase.replication.ReplicationUtils; 125import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; 126import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; 127import org.apache.hadoop.hbase.security.Superusers; 128import org.apache.hadoop.hbase.security.access.Permission; 129import org.apache.hadoop.hbase.util.Bytes; 130import org.apache.hadoop.hbase.util.DNS; 131import org.apache.hadoop.hbase.util.DNS.ServerType; 132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 133import org.apache.hadoop.hbase.util.Pair; 134import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 135import org.apache.hadoop.hbase.wal.WAL; 136import org.apache.hadoop.hbase.wal.WALEdit; 137import org.apache.hadoop.hbase.wal.WALKey; 138import org.apache.hadoop.hbase.wal.WALSplitUtil; 139import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 140import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 141import org.apache.yetus.audience.InterfaceAudience; 142import org.slf4j.Logger; 143import org.slf4j.LoggerFactory; 144 145import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 146import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 147import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 148import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 149import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 150import org.apache.hbase.thirdparty.com.google.protobuf.Message; 151import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 152import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 153import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 154import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 155import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 156 157import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 158import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 159import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 248 249/** 250 * Implements the regionserver RPC services. 251 */ 252@InterfaceAudience.Private 253public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> 254 implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface { 255 256 private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 257 258 /** RPC scheduler to use for the region server. */ 259 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 260 "hbase.region.server.rpc.scheduler.factory.class"; 261 262 /** 263 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 264 * configuration exists to prevent the scenario where a time limit is specified to be so 265 * restrictive that the time limit is reached immediately (before any cells are scanned). 266 */ 267 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 268 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 269 /** 270 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 271 */ 272 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 273 274 /** 275 * Whether to reject rows with size > threshold defined by 276 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 277 */ 278 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 279 "hbase.rpc.rows.size.threshold.reject"; 280 281 /** 282 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 283 */ 284 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 285 286 // Request counter. (Includes requests that are not serviced by regions.) 287 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 288 final LongAdder requestCount = new LongAdder(); 289 290 // Request counter for rpc get 291 final LongAdder rpcGetRequestCount = new LongAdder(); 292 293 // Request counter for rpc scan 294 final LongAdder rpcScanRequestCount = new LongAdder(); 295 296 // Request counter for scans that might end up in full scans 297 final LongAdder rpcFullScanRequestCount = new LongAdder(); 298 299 // Request counter for rpc multi 300 final LongAdder rpcMultiRequestCount = new LongAdder(); 301 302 // Request counter for rpc mutate 303 final LongAdder rpcMutateRequestCount = new LongAdder(); 304 305 private volatile long maxScannerResultSize; 306 307 private ScannerIdGenerator scannerIdGenerator; 308 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 309 // Hold the name and last sequence number of a closed scanner for a while. This is used 310 // to keep compatible for old clients which may send next or close request to a region 311 // scanner which has already been exhausted. The entries will be removed automatically 312 // after scannerLeaseTimeoutPeriod. 313 private final Cache<String, Long> closedScanners; 314 /** 315 * The lease timeout period for client scanners (milliseconds). 316 */ 317 private final int scannerLeaseTimeoutPeriod; 318 319 /** 320 * The RPC timeout period (milliseconds) 321 */ 322 private final int rpcTimeout; 323 324 /** 325 * The minimum allowable delta to use for the scan limit 326 */ 327 private final long minimumScanTimeLimitDelta; 328 329 /** 330 * Row size threshold for multi requests above which a warning is logged 331 */ 332 private volatile int rowSizeWarnThreshold; 333 /* 334 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 335 * rowSizeWarnThreshold 336 */ 337 private volatile boolean rejectRowsWithSizeOverThreshold; 338 339 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 340 341 /** 342 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 343 * to selectively enable/disable these services (Rare is the case where you would ever turn off 344 * one or the other). 345 */ 346 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 347 "hbase.regionserver.admin.executorService"; 348 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 349 "hbase.regionserver.client.executorService"; 350 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 351 "hbase.regionserver.client.meta.executorService"; 352 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 353 "hbase.regionserver.bootstrap.nodes.executorService"; 354 355 /** 356 * An Rpc callback for closing a RegionScanner. 357 */ 358 private static final class RegionScannerCloseCallBack implements RpcCallback { 359 360 private final RegionScanner scanner; 361 362 public RegionScannerCloseCallBack(RegionScanner scanner) { 363 this.scanner = scanner; 364 } 365 366 @Override 367 public void run() throws IOException { 368 this.scanner.close(); 369 } 370 } 371 372 /** 373 * An Rpc callback for doing shipped() call on a RegionScanner. 374 */ 375 private class RegionScannerShippedCallBack implements RpcCallback { 376 private final String scannerName; 377 private final Shipper shipper; 378 private final Lease lease; 379 380 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 381 this.scannerName = scannerName; 382 this.shipper = shipper; 383 this.lease = lease; 384 } 385 386 @Override 387 public void run() throws IOException { 388 this.shipper.shipped(); 389 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 390 // Rpc call and we are at end of the call now. Time to add it back. 391 if (scanners.containsKey(scannerName)) { 392 if (lease != null) { 393 server.getLeaseManager().addLease(lease); 394 } 395 } 396 } 397 } 398 399 /** 400 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 401 * completion of multiGets. 402 */ 403 static class RegionScannersCloseCallBack implements RpcCallback { 404 private final List<RegionScanner> scanners = new ArrayList<>(); 405 406 public void addScanner(RegionScanner scanner) { 407 this.scanners.add(scanner); 408 } 409 410 @Override 411 public void run() { 412 for (RegionScanner scanner : scanners) { 413 try { 414 scanner.close(); 415 } catch (IOException e) { 416 LOG.error("Exception while closing the scanner " + scanner, e); 417 } 418 } 419 } 420 } 421 422 static class RegionScannerContext { 423 final String scannerName; 424 final RegionScannerHolder holder; 425 final OperationQuota quota; 426 427 RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) { 428 this.scannerName = scannerName; 429 this.holder = holder; 430 this.quota = quota; 431 } 432 } 433 434 /** 435 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 436 */ 437 static final class RegionScannerHolder { 438 private final AtomicLong nextCallSeq = new AtomicLong(0); 439 private final RegionScanner s; 440 private final HRegion r; 441 private final RpcCallback closeCallBack; 442 private final RpcCallback shippedCallback; 443 private byte[] rowOfLastPartialResult; 444 private boolean needCursor; 445 private boolean fullRegionScan; 446 private final String clientIPAndPort; 447 private final String userName; 448 private volatile long maxBlockBytesScanned = 0; 449 private volatile long prevBlockBytesScanned = 0; 450 private volatile long prevBlockBytesScannedDifference = 0; 451 452 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 453 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 454 String clientIPAndPort, String userName) { 455 this.s = s; 456 this.r = r; 457 this.closeCallBack = closeCallBack; 458 this.shippedCallback = shippedCallback; 459 this.needCursor = needCursor; 460 this.fullRegionScan = fullRegionScan; 461 this.clientIPAndPort = clientIPAndPort; 462 this.userName = userName; 463 } 464 465 long getNextCallSeq() { 466 return nextCallSeq.get(); 467 } 468 469 boolean incNextCallSeq(long currentSeq) { 470 // Use CAS to prevent multiple scan request running on the same scanner. 471 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 472 } 473 474 long getMaxBlockBytesScanned() { 475 return maxBlockBytesScanned; 476 } 477 478 long getPrevBlockBytesScannedDifference() { 479 return prevBlockBytesScannedDifference; 480 } 481 482 void updateBlockBytesScanned(long blockBytesScanned) { 483 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 484 prevBlockBytesScanned = blockBytesScanned; 485 if (blockBytesScanned > maxBlockBytesScanned) { 486 maxBlockBytesScanned = blockBytesScanned; 487 } 488 } 489 490 // Should be called only when we need to print lease expired messages otherwise 491 // cache the String once made. 492 @Override 493 public String toString() { 494 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 495 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 496 } 497 } 498 499 /** 500 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 501 */ 502 private class ScannerListener implements LeaseListener { 503 private final String scannerName; 504 505 ScannerListener(final String n) { 506 this.scannerName = n; 507 } 508 509 @Override 510 public void leaseExpired() { 511 RegionScannerHolder rsh = scanners.remove(this.scannerName); 512 if (rsh == null) { 513 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 514 return; 515 } 516 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 517 server.getMetrics().incrScannerLeaseExpired(); 518 RegionScanner s = rsh.s; 519 HRegion region = null; 520 try { 521 region = server.getRegion(s.getRegionInfo().getRegionName()); 522 if (region != null && region.getCoprocessorHost() != null) { 523 region.getCoprocessorHost().preScannerClose(s); 524 } 525 } catch (IOException e) { 526 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 527 } finally { 528 try { 529 s.close(); 530 if (region != null && region.getCoprocessorHost() != null) { 531 region.getCoprocessorHost().postScannerClose(s); 532 } 533 } catch (IOException e) { 534 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 535 } 536 } 537 } 538 } 539 540 private static ResultOrException getResultOrException(final ClientProtos.Result r, 541 final int index) { 542 return getResultOrException(ResponseConverter.buildActionResult(r), index); 543 } 544 545 private static ResultOrException getResultOrException(final Exception e, final int index) { 546 return getResultOrException(ResponseConverter.buildActionResult(e), index); 547 } 548 549 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 550 final int index) { 551 return builder.setIndex(index).build(); 552 } 553 554 /** 555 * Checks for the following pre-checks in order: 556 * <ol> 557 * <li>RegionServer is running</li> 558 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 559 * </ol> 560 * @param requestName name of rpc request. Used in reporting failures to provide context. 561 * @throws ServiceException If any of the above listed pre-check fails. 562 */ 563 private void rpcPreCheck(String requestName) throws ServiceException { 564 try { 565 checkOpen(); 566 requirePermission(requestName, Permission.Action.ADMIN); 567 } catch (IOException ioe) { 568 throw new ServiceException(ioe); 569 } 570 } 571 572 private boolean isClientCellBlockSupport(RpcCallContext context) { 573 return context != null && context.isClientCellBlockSupported(); 574 } 575 576 private void addResult(final MutateResponse.Builder builder, final Result result, 577 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 578 if (result == null) return; 579 if (clientCellBlockSupported) { 580 builder.setResult(ProtobufUtil.toResultNoData(result)); 581 rpcc.setCellScanner(result.cellScanner()); 582 } else { 583 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 584 builder.setResult(pbr); 585 } 586 } 587 588 private void addResults(ScanResponse.Builder builder, List<Result> results, 589 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 590 builder.setStale(!isDefaultRegion); 591 if (results.isEmpty()) { 592 return; 593 } 594 if (clientCellBlockSupported) { 595 for (Result res : results) { 596 builder.addCellsPerResult(res.size()); 597 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 598 } 599 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results)); 600 } else { 601 for (Result res : results) { 602 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 603 builder.addResults(pbr); 604 } 605 } 606 } 607 608 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 609 CellScanner cellScanner, Condition condition, long nonceGroup, 610 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 611 int countOfCompleteMutation = 0; 612 try { 613 if (!region.getRegionInfo().isMetaRegion()) { 614 server.getMemStoreFlusher().reclaimMemStoreMemory(); 615 } 616 List<Mutation> mutations = new ArrayList<>(); 617 long nonce = HConstants.NO_NONCE; 618 for (ClientProtos.Action action : actions) { 619 if (action.hasGet()) { 620 throw new DoNotRetryIOException( 621 "Atomic put and/or delete only, not a Get=" + action.getGet()); 622 } 623 MutationProto mutation = action.getMutation(); 624 MutationType type = mutation.getMutateType(); 625 switch (type) { 626 case PUT: 627 Put put = ProtobufUtil.toPut(mutation, cellScanner); 628 ++countOfCompleteMutation; 629 checkCellSizeLimit(region, put); 630 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 631 mutations.add(put); 632 break; 633 case DELETE: 634 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 635 ++countOfCompleteMutation; 636 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 637 mutations.add(del); 638 break; 639 case INCREMENT: 640 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 641 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 642 ++countOfCompleteMutation; 643 checkCellSizeLimit(region, increment); 644 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 645 mutations.add(increment); 646 break; 647 case APPEND: 648 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 649 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 650 ++countOfCompleteMutation; 651 checkCellSizeLimit(region, append); 652 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 653 mutations.add(append); 654 break; 655 default: 656 throw new DoNotRetryIOException("invalid mutation type : " + type); 657 } 658 } 659 660 if (mutations.size() == 0) { 661 return new CheckAndMutateResult(true, null); 662 } else { 663 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 664 CheckAndMutateResult result = null; 665 if (region.getCoprocessorHost() != null) { 666 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 667 } 668 if (result == null) { 669 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 670 if (region.getCoprocessorHost() != null) { 671 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 672 } 673 } 674 return result; 675 } 676 } finally { 677 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 678 // even if the malformed cells are not skipped. 679 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 680 skipCellsForMutation(actions.get(i), cellScanner); 681 } 682 } 683 } 684 685 /** 686 * Execute an append mutation. 687 * @return result to return to client if default operation should be bypassed as indicated by 688 * RegionObserver, null otherwise 689 */ 690 private Result append(final HRegion region, final OperationQuota quota, 691 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 692 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 693 long before = EnvironmentEdgeManager.currentTime(); 694 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 695 checkCellSizeLimit(region, append); 696 spaceQuota.getPolicyEnforcement(region).check(append); 697 quota.addMutation(append); 698 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 699 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 700 Result r = region.append(append, nonceGroup, nonce); 701 if (server.getMetrics() != null) { 702 long blockBytesScanned = 703 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 704 server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 705 blockBytesScanned); 706 } 707 return r == null ? Result.EMPTY_RESULT : r; 708 } 709 710 /** 711 * Execute an increment mutation. 712 */ 713 private Result increment(final HRegion region, final OperationQuota quota, 714 final MutationProto mutation, final CellScanner cells, long nonceGroup, 715 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 716 long before = EnvironmentEdgeManager.currentTime(); 717 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 718 checkCellSizeLimit(region, increment); 719 spaceQuota.getPolicyEnforcement(region).check(increment); 720 quota.addMutation(increment); 721 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 722 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 723 Result r = region.increment(increment, nonceGroup, nonce); 724 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 725 if (metricsRegionServer != null) { 726 long blockBytesScanned = 727 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 728 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 729 blockBytesScanned); 730 } 731 return r == null ? Result.EMPTY_RESULT : r; 732 } 733 734 /** 735 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 736 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 737 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 738 * returns as a 'result'. 739 * @param closeCallBack the callback to be used with multigets 740 * @param context the current RpcCallContext 741 * @return Return the <code>cellScanner</code> passed 742 */ 743 private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region, 744 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 745 final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn, 746 long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 747 ActivePolicyEnforcement spaceQuotaEnforcement) { 748 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 749 // one at a time, we instead pass them in batch. Be aware that the corresponding 750 // ResultOrException instance that matches each Put or Delete is then added down in the 751 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 752 // deferred/batched 753 List<ClientProtos.Action> mutations = null; 754 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 755 IOException sizeIOE = null; 756 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 757 ResultOrException.newBuilder(); 758 boolean hasResultOrException = false; 759 for (ClientProtos.Action action : actions.getActionList()) { 760 hasResultOrException = false; 761 resultOrExceptionBuilder.clear(); 762 try { 763 Result r = null; 764 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 765 if ( 766 context != null && context.isRetryImmediatelySupported() 767 && (context.getResponseCellSize() > maxQuotaResultSize 768 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 769 ) { 770 771 // We're storing the exception since the exception and reason string won't 772 // change after the response size limit is reached. 773 if (sizeIOE == null) { 774 // We don't need the stack un-winding do don't throw the exception. 775 // Throwing will kill the JVM's JIT. 776 // 777 // Instead just create the exception and then store it. 778 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 779 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 780 781 // Only report the exception once since there's only one request that 782 // caused the exception. Otherwise this number will dominate the exceptions count. 783 rpcServer.getMetrics().exception(sizeIOE); 784 } 785 786 // Now that there's an exception is known to be created 787 // use it for the response. 788 // 789 // This will create a copy in the builder. 790 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 791 resultOrExceptionBuilder.setException(pair); 792 context.incrementResponseExceptionSize(pair.getSerializedSize()); 793 resultOrExceptionBuilder.setIndex(action.getIndex()); 794 builder.addResultOrException(resultOrExceptionBuilder.build()); 795 skipCellsForMutation(action, cellScanner); 796 continue; 797 } 798 if (action.hasGet()) { 799 long before = EnvironmentEdgeManager.currentTime(); 800 ClientProtos.Get pbGet = action.getGet(); 801 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 802 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 803 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 804 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 805 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 806 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 807 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 808 + "reverse Scan."); 809 } 810 try { 811 Get get = ProtobufUtil.toGet(pbGet); 812 if (context != null) { 813 r = get(get, (region), closeCallBack, context); 814 } else { 815 r = region.get(get); 816 } 817 } finally { 818 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 819 if (metricsRegionServer != null) { 820 long blockBytesScanned = 821 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 822 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 823 blockBytesScanned); 824 } 825 } 826 } else if (action.hasServiceCall()) { 827 hasResultOrException = true; 828 Message result = execServiceOnRegion(region, action.getServiceCall()); 829 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 830 ClientProtos.CoprocessorServiceResult.newBuilder(); 831 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 832 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 833 // TODO: Copy!!! 834 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 835 } else if (action.hasMutation()) { 836 MutationType type = action.getMutation().getMutateType(); 837 if ( 838 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 839 && !mutations.isEmpty() 840 ) { 841 // Flush out any Puts or Deletes already collected. 842 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 843 spaceQuotaEnforcement); 844 mutations.clear(); 845 } 846 switch (type) { 847 case APPEND: 848 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 849 spaceQuotaEnforcement, context); 850 break; 851 case INCREMENT: 852 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 853 spaceQuotaEnforcement, context); 854 break; 855 case PUT: 856 case DELETE: 857 // Collect the individual mutations and apply in a batch 858 if (mutations == null) { 859 mutations = new ArrayList<>(actions.getActionCount()); 860 } 861 mutations.add(action); 862 break; 863 default: 864 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 865 } 866 } else { 867 throw new HBaseIOException("Unexpected Action type"); 868 } 869 if (r != null) { 870 ClientProtos.Result pbResult = null; 871 if (isClientCellBlockSupport(context)) { 872 pbResult = ProtobufUtil.toResultNoData(r); 873 // Hard to guess the size here. Just make a rough guess. 874 if (cellsToReturn == null) { 875 cellsToReturn = new ArrayList<>(); 876 } 877 cellsToReturn.add(r); 878 } else { 879 pbResult = ProtobufUtil.toResult(r); 880 } 881 addSize(context, r); 882 hasResultOrException = true; 883 resultOrExceptionBuilder.setResult(pbResult); 884 } 885 // Could get to here and there was no result and no exception. Presumes we added 886 // a Put or Delete to the collecting Mutations List for adding later. In this 887 // case the corresponding ResultOrException instance for the Put or Delete will be added 888 // down in the doNonAtomicBatchOp method call rather than up here. 889 } catch (IOException ie) { 890 rpcServer.getMetrics().exception(ie); 891 hasResultOrException = true; 892 NameBytesPair pair = ResponseConverter.buildException(ie); 893 resultOrExceptionBuilder.setException(pair); 894 context.incrementResponseExceptionSize(pair.getSerializedSize()); 895 } 896 if (hasResultOrException) { 897 // Propagate index. 898 resultOrExceptionBuilder.setIndex(action.getIndex()); 899 builder.addResultOrException(resultOrExceptionBuilder.build()); 900 } 901 } 902 // Finish up any outstanding mutations 903 if (!CollectionUtils.isEmpty(mutations)) { 904 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 905 } 906 return cellsToReturn; 907 } 908 909 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 910 if (r.maxCellSize > 0) { 911 CellScanner cells = m.cellScanner(); 912 while (cells.advance()) { 913 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 914 if (size > r.maxCellSize) { 915 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 916 + r.maxCellSize + " bytes"; 917 LOG.debug(msg); 918 throw new DoNotRetryIOException(msg); 919 } 920 } 921 } 922 } 923 924 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 925 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 926 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 927 // Just throw the exception. The exception will be caught and then added to region-level 928 // exception for RegionAction. Leaving the null to action result is ok since the null 929 // result is viewed as failure by hbase client. And the region-lever exception will be used 930 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 931 // AsyncBatchRpcRetryingCaller#onComplete for more details. 932 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 933 } 934 935 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 936 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 937 ActivePolicyEnforcement spaceQuotaEnforcement) { 938 try { 939 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 940 spaceQuotaEnforcement, false); 941 } catch (IOException e) { 942 // Set the exception for each action. The mutations in same RegionAction are group to 943 // different batch and then be processed individually. Hence, we don't set the region-level 944 // exception here for whole RegionAction. 945 for (Action mutation : mutations) { 946 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 947 } 948 } 949 } 950 951 /** 952 * Execute a list of mutations. 953 */ 954 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 955 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 956 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 957 throws IOException { 958 Mutation[] mArray = new Mutation[mutations.size()]; 959 long before = EnvironmentEdgeManager.currentTime(); 960 boolean batchContainsPuts = false, batchContainsDelete = false; 961 try { 962 /** 963 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 964 * since mutation array may have been reoredered.In order to return the right result or 965 * exception to the corresponding actions, We need to know which action is the mutation belong 966 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 967 */ 968 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 969 int i = 0; 970 long nonce = HConstants.NO_NONCE; 971 for (ClientProtos.Action action : mutations) { 972 if (action.hasGet()) { 973 throw new DoNotRetryIOException( 974 "Atomic put and/or delete only, not a Get=" + action.getGet()); 975 } 976 MutationProto m = action.getMutation(); 977 Mutation mutation; 978 switch (m.getMutateType()) { 979 case PUT: 980 mutation = ProtobufUtil.toPut(m, cells); 981 batchContainsPuts = true; 982 break; 983 984 case DELETE: 985 mutation = ProtobufUtil.toDelete(m, cells); 986 batchContainsDelete = true; 987 break; 988 989 case INCREMENT: 990 mutation = ProtobufUtil.toIncrement(m, cells); 991 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 992 break; 993 994 case APPEND: 995 mutation = ProtobufUtil.toAppend(m, cells); 996 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 997 break; 998 999 default: 1000 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 1001 } 1002 mutationActionMap.put(mutation, action); 1003 mArray[i++] = mutation; 1004 checkCellSizeLimit(region, mutation); 1005 // Check if a space quota disallows this mutation 1006 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1007 quota.addMutation(mutation); 1008 } 1009 1010 if (!region.getRegionInfo().isMetaRegion()) { 1011 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1012 } 1013 1014 // HBASE-17924 1015 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1016 // order is preserved as its expected from the client 1017 if (!atomic) { 1018 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1019 } 1020 1021 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1022 1023 // When atomic is true, it indicates that the mutateRow API or the batch API with 1024 // RowMutations is called. In this case, we need to merge the results of the 1025 // Increment/Append operations if the mutations include those operations, and set the merged 1026 // result to the first element of the ResultOrException list 1027 if (atomic) { 1028 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1029 List<Result> results = new ArrayList<>(); 1030 for (i = 0; i < codes.length; i++) { 1031 if (codes[i].getResult() != null) { 1032 results.add(codes[i].getResult()); 1033 } 1034 if (i != 0) { 1035 resultOrExceptions 1036 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1037 } 1038 } 1039 1040 if (results.isEmpty()) { 1041 builder.addResultOrException( 1042 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1043 } else { 1044 // Merge the results of the Increment/Append operations 1045 List<Cell> cellList = new ArrayList<>(); 1046 for (Result result : results) { 1047 if (result.rawCells() != null) { 1048 cellList.addAll(Arrays.asList(result.rawCells())); 1049 } 1050 } 1051 Result result = Result.create(cellList); 1052 1053 // Set the merged result of the Increment/Append operations to the first element of the 1054 // ResultOrException list 1055 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1056 } 1057 1058 builder.addAllResultOrException(resultOrExceptions); 1059 return; 1060 } 1061 1062 for (i = 0; i < codes.length; i++) { 1063 Mutation currentMutation = mArray[i]; 1064 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1065 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1066 Exception e; 1067 switch (codes[i].getOperationStatusCode()) { 1068 case BAD_FAMILY: 1069 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1070 builder.addResultOrException(getResultOrException(e, index)); 1071 break; 1072 1073 case SANITY_CHECK_FAILURE: 1074 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1075 builder.addResultOrException(getResultOrException(e, index)); 1076 break; 1077 1078 default: 1079 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1080 builder.addResultOrException(getResultOrException(e, index)); 1081 break; 1082 1083 case SUCCESS: 1084 ClientProtos.Result result = codes[i].getResult() == null 1085 ? ClientProtos.Result.getDefaultInstance() 1086 : ProtobufUtil.toResult(codes[i].getResult()); 1087 builder.addResultOrException(getResultOrException(result, index)); 1088 break; 1089 1090 case STORE_TOO_BUSY: 1091 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1092 builder.addResultOrException(getResultOrException(e, index)); 1093 break; 1094 } 1095 } 1096 } finally { 1097 int processedMutationIndex = 0; 1098 for (Action mutation : mutations) { 1099 // The non-null mArray[i] means the cell scanner has been read. 1100 if (mArray[processedMutationIndex++] == null) { 1101 skipCellsForMutation(mutation, cells); 1102 } 1103 } 1104 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1105 } 1106 } 1107 1108 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1109 boolean batchContainsDelete) { 1110 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 1111 if (metricsRegionServer != null) { 1112 long after = EnvironmentEdgeManager.currentTime(); 1113 if (batchContainsPuts) { 1114 metricsRegionServer.updatePutBatch(region, after - starttime); 1115 } 1116 if (batchContainsDelete) { 1117 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1118 } 1119 } 1120 } 1121 1122 /** 1123 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1124 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1125 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1126 * exceptionMessage if any 1127 * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying 1128 * edits for secondary replicas any more, see 1129 * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. 1130 */ 1131 @Deprecated 1132 private OperationStatus[] doReplayBatchOp(final HRegion region, 1133 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1134 long before = EnvironmentEdgeManager.currentTime(); 1135 boolean batchContainsPuts = false, batchContainsDelete = false; 1136 try { 1137 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1138 MutationReplay m = it.next(); 1139 1140 if (m.getType() == MutationType.PUT) { 1141 batchContainsPuts = true; 1142 } else { 1143 batchContainsDelete = true; 1144 } 1145 1146 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1147 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1148 if (metaCells != null && !metaCells.isEmpty()) { 1149 for (Cell metaCell : metaCells) { 1150 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1151 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1152 HRegion hRegion = region; 1153 if (compactionDesc != null) { 1154 // replay the compaction. Remove the files from stores only if we are the primary 1155 // region replica (thus own the files) 1156 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1157 replaySeqId); 1158 continue; 1159 } 1160 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1161 if (flushDesc != null && !isDefaultReplica) { 1162 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1163 continue; 1164 } 1165 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1166 if (regionEvent != null && !isDefaultReplica) { 1167 hRegion.replayWALRegionEventMarker(regionEvent); 1168 continue; 1169 } 1170 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1171 if (bulkLoadEvent != null) { 1172 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1173 continue; 1174 } 1175 } 1176 it.remove(); 1177 } 1178 } 1179 requestCount.increment(); 1180 if (!region.getRegionInfo().isMetaRegion()) { 1181 server.getMemStoreFlusher().reclaimMemStoreMemory(); 1182 } 1183 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1184 replaySeqId); 1185 } finally { 1186 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1187 } 1188 } 1189 1190 private void closeAllScanners() { 1191 // Close any outstanding scanners. Means they'll get an UnknownScanner 1192 // exception next time they come in. 1193 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1194 try { 1195 e.getValue().s.close(); 1196 } catch (IOException ioe) { 1197 LOG.warn("Closing scanner " + e.getKey(), ioe); 1198 } 1199 } 1200 } 1201 1202 // Directly invoked only for testing 1203 public RSRpcServices(final HRegionServer rs) throws IOException { 1204 super(rs, rs.getProcessName()); 1205 final Configuration conf = rs.getConfiguration(); 1206 setReloadableGuardrails(conf); 1207 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1208 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1209 rpcTimeout = 1210 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1211 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1212 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1213 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1214 closedScanners = CacheBuilder.newBuilder() 1215 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1216 } 1217 1218 @Override 1219 protected boolean defaultReservoirEnabled() { 1220 return true; 1221 } 1222 1223 @Override 1224 protected ServerType getDNSServerType() { 1225 return DNS.ServerType.REGIONSERVER; 1226 } 1227 1228 @Override 1229 protected String getHostname(Configuration conf, String defaultHostname) { 1230 return conf.get("hbase.regionserver.ipc.address", defaultHostname); 1231 } 1232 1233 @Override 1234 protected String getPortConfigName() { 1235 return HConstants.REGIONSERVER_PORT; 1236 } 1237 1238 @Override 1239 protected int getDefaultPort() { 1240 return HConstants.DEFAULT_REGIONSERVER_PORT; 1241 } 1242 1243 @Override 1244 protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) { 1245 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1246 SimpleRpcSchedulerFactory.class); 1247 } 1248 1249 protected RpcServerInterface createRpcServer(final Server server, 1250 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1251 final String name) throws IOException { 1252 final Configuration conf = server.getConfiguration(); 1253 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1254 try { 1255 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1256 // bindAddress 1257 // for this 1258 // server. 1259 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1260 } catch (BindException be) { 1261 throw new IOException(be.getMessage() + ". To switch ports use the '" 1262 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1263 be.getCause() != null ? be.getCause() : be); 1264 } 1265 } 1266 1267 protected Class<?> getRpcSchedulerFactoryClass() { 1268 final Configuration conf = server.getConfiguration(); 1269 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1270 SimpleRpcSchedulerFactory.class); 1271 } 1272 1273 protected PriorityFunction createPriority() { 1274 return new RSAnnotationReadingPriorityFunction(this); 1275 } 1276 1277 public int getScannersCount() { 1278 return scanners.size(); 1279 } 1280 1281 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1282 RegionScanner getScanner(long scannerId) { 1283 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1284 return rsh == null ? null : rsh.s; 1285 } 1286 1287 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1288 private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) { 1289 return scanners.get(toScannerName(scannerId)); 1290 } 1291 1292 public String getScanDetailsWithId(long scannerId) { 1293 RegionScanner scanner = getScanner(scannerId); 1294 if (scanner == null) { 1295 return null; 1296 } 1297 StringBuilder builder = new StringBuilder(); 1298 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1299 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1300 builder.append(" operation_id: ").append(scanner.getOperationId()); 1301 return builder.toString(); 1302 } 1303 1304 public String getScanDetailsWithRequest(ScanRequest request) { 1305 try { 1306 if (!request.hasRegion()) { 1307 return null; 1308 } 1309 Region region = getRegion(request.getRegion()); 1310 StringBuilder builder = new StringBuilder(); 1311 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1312 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1313 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1314 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1315 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1316 break; 1317 } 1318 } 1319 return builder.toString(); 1320 } catch (IOException ignored) { 1321 return null; 1322 } 1323 } 1324 1325 /** 1326 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1327 */ 1328 long getScannerVirtualTime(long scannerId) { 1329 RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId); 1330 return rsh == null ? 0L : rsh.getNextCallSeq(); 1331 } 1332 1333 /** 1334 * Method to account for the size of retained cells. 1335 * @param context rpc call context 1336 * @param r result to add size. 1337 * @return an object that represents the last referenced block from this response. 1338 */ 1339 void addSize(RpcCallContext context, Result r) { 1340 if (context != null && r != null && !r.isEmpty()) { 1341 for (Cell c : r.rawCells()) { 1342 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1343 } 1344 } 1345 } 1346 1347 /** Returns Remote client's ip and port else null if can't be determined. */ 1348 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1349 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1350 static String getRemoteClientIpAndPort() { 1351 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1352 if (rpcCall == null) { 1353 return HConstants.EMPTY_STRING; 1354 } 1355 InetAddress address = rpcCall.getRemoteAddress(); 1356 if (address == null) { 1357 return HConstants.EMPTY_STRING; 1358 } 1359 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1360 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1361 // scanning than a hostname anyways. 1362 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1363 } 1364 1365 /** Returns Remote client's username. */ 1366 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1367 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1368 static String getUserName() { 1369 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1370 if (rpcCall == null) { 1371 return HConstants.EMPTY_STRING; 1372 } 1373 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1374 } 1375 1376 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1377 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1378 Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1379 new ScannerListener(scannerName)); 1380 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1381 RpcCallback closeCallback = 1382 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1383 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1384 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1385 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1386 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1387 + scannerName + ", " + existing; 1388 return rsh; 1389 } 1390 1391 private boolean isFullRegionScan(Scan scan, HRegion region) { 1392 // If the scan start row equals or less than the start key of the region 1393 // and stop row greater than equals end key (if stop row present) 1394 // or if the stop row is empty 1395 // account this as a full region scan 1396 if ( 1397 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1398 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1399 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1400 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1401 ) { 1402 return true; 1403 } 1404 return false; 1405 } 1406 1407 /** 1408 * Find the HRegion based on a region specifier 1409 * @param regionSpecifier the region specifier 1410 * @return the corresponding region 1411 * @throws IOException if the specifier is not null, but failed to find the region 1412 */ 1413 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1414 return server.getRegion(regionSpecifier.getValue().toByteArray()); 1415 } 1416 1417 /** 1418 * Find the List of HRegions based on a list of region specifiers 1419 * @param regionSpecifiers the list of region specifiers 1420 * @return the corresponding list of regions 1421 * @throws IOException if any of the specifiers is not null, but failed to find the region 1422 */ 1423 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1424 final CacheEvictionStatsBuilder stats) { 1425 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1426 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1427 try { 1428 regions.add(server.getRegion(regionSpecifier.getValue().toByteArray())); 1429 } catch (NotServingRegionException e) { 1430 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1431 } 1432 } 1433 return regions; 1434 } 1435 1436 PriorityFunction getPriority() { 1437 return priority; 1438 } 1439 1440 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1441 return server.getRegionServerRpcQuotaManager(); 1442 } 1443 1444 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1445 return server.getRegionServerSpaceQuotaManager(); 1446 } 1447 1448 void start(ZKWatcher zkWatcher) { 1449 this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName()); 1450 internalStart(zkWatcher); 1451 } 1452 1453 void stop() { 1454 closeAllScanners(); 1455 internalStop(); 1456 } 1457 1458 /** 1459 * Called to verify that this server is up and running. 1460 */ 1461 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1462 protected void checkOpen() throws IOException { 1463 if (server.isAborted()) { 1464 throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting"); 1465 } 1466 if (server.isStopped()) { 1467 throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping"); 1468 } 1469 if (!server.isDataFileSystemOk()) { 1470 throw new RegionServerStoppedException("File system not available"); 1471 } 1472 if (!server.isOnline()) { 1473 throw new ServerNotRunningYetException( 1474 "Server " + server.getServerName() + " is not running yet"); 1475 } 1476 } 1477 1478 /** 1479 * By default, put up an Admin and a Client Service. Set booleans 1480 * <code>hbase.regionserver.admin.executorService</code> and 1481 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1482 * Default is that both are enabled. 1483 * @return immutable list of blocking services and the security info classes that this server 1484 * supports 1485 */ 1486 protected List<BlockingServiceAndInterface> getServices() { 1487 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1488 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1489 boolean clientMeta = 1490 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1491 boolean bootstrapNodes = 1492 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1493 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1494 if (client) { 1495 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1496 ClientService.BlockingInterface.class)); 1497 } 1498 if (admin) { 1499 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1500 AdminService.BlockingInterface.class)); 1501 } 1502 if (clientMeta) { 1503 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1504 ClientMetaService.BlockingInterface.class)); 1505 } 1506 if (bootstrapNodes) { 1507 bssi.add( 1508 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1509 BootstrapNodeService.BlockingInterface.class)); 1510 } 1511 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1512 } 1513 1514 /** 1515 * Close a region on the region server. 1516 * @param controller the RPC controller 1517 * @param request the request 1518 */ 1519 @Override 1520 @QosPriority(priority = HConstants.ADMIN_QOS) 1521 public CloseRegionResponse closeRegion(final RpcController controller, 1522 final CloseRegionRequest request) throws ServiceException { 1523 final ServerName sn = (request.hasDestinationServer() 1524 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1525 : null); 1526 1527 try { 1528 checkOpen(); 1529 throwOnWrongStartCode(request); 1530 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1531 1532 requestCount.increment(); 1533 if (sn == null) { 1534 LOG.info("Close " + encodedRegionName + " without moving"); 1535 } else { 1536 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1537 } 1538 boolean closed = server.closeRegion(encodedRegionName, false, sn); 1539 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1540 return builder.build(); 1541 } catch (IOException ie) { 1542 throw new ServiceException(ie); 1543 } 1544 } 1545 1546 /** 1547 * Compact a region on the region server. 1548 * @param controller the RPC controller 1549 * @param request the request 1550 */ 1551 @Override 1552 @QosPriority(priority = HConstants.ADMIN_QOS) 1553 public CompactRegionResponse compactRegion(final RpcController controller, 1554 final CompactRegionRequest request) throws ServiceException { 1555 try { 1556 checkOpen(); 1557 requestCount.increment(); 1558 HRegion region = getRegion(request.getRegion()); 1559 // Quota support is enabled, the requesting user is not system/super user 1560 // and a quota policy is enforced that disables compactions. 1561 if ( 1562 QuotaUtil.isQuotaEnabled(getConfiguration()) 1563 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1564 && this.server.getRegionServerSpaceQuotaManager() 1565 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1566 ) { 1567 throw new DoNotRetryIOException( 1568 "Compactions on this region are " + "disabled due to a space quota violation."); 1569 } 1570 region.startRegionOperation(Operation.COMPACT_REGION); 1571 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1572 boolean major = request.hasMajor() && request.getMajor(); 1573 if (request.hasFamily()) { 1574 byte[] family = request.getFamily().toByteArray(); 1575 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1576 + region.getRegionInfo().getRegionNameAsString() + " and family " 1577 + Bytes.toString(family); 1578 LOG.trace(log); 1579 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1580 CompactionLifeCycleTracker.DUMMY); 1581 } else { 1582 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1583 + region.getRegionInfo().getRegionNameAsString(); 1584 LOG.trace(log); 1585 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1586 } 1587 return CompactRegionResponse.newBuilder().build(); 1588 } catch (IOException ie) { 1589 throw new ServiceException(ie); 1590 } 1591 } 1592 1593 @Override 1594 @QosPriority(priority = HConstants.ADMIN_QOS) 1595 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1596 CompactionSwitchRequest request) throws ServiceException { 1597 rpcPreCheck("compactionSwitch"); 1598 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1599 requestCount.increment(); 1600 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1601 CompactionSwitchResponse response = 1602 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1603 if (prevState == request.getEnabled()) { 1604 // passed in requested state is same as current state. No action required 1605 return response; 1606 } 1607 compactSplitThread.switchCompaction(request.getEnabled()); 1608 return response; 1609 } 1610 1611 /** 1612 * Flush a region on the region server. 1613 * @param controller the RPC controller 1614 * @param request the request 1615 */ 1616 @Override 1617 @QosPriority(priority = HConstants.ADMIN_QOS) 1618 public FlushRegionResponse flushRegion(final RpcController controller, 1619 final FlushRegionRequest request) throws ServiceException { 1620 try { 1621 checkOpen(); 1622 requestCount.increment(); 1623 HRegion region = getRegion(request.getRegion()); 1624 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1625 boolean shouldFlush = true; 1626 if (request.hasIfOlderThanTs()) { 1627 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1628 } 1629 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1630 if (shouldFlush) { 1631 boolean writeFlushWalMarker = 1632 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1633 // Go behind the curtain so we can manage writing of the flush WAL marker 1634 HRegion.FlushResultImpl flushResult = null; 1635 if (request.hasFamily()) { 1636 List<byte[]> families = new ArrayList(); 1637 families.add(request.getFamily().toByteArray()); 1638 TableDescriptor tableDescriptor = region.getTableDescriptor(); 1639 List<String> noSuchFamilies = families.stream() 1640 .filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList(); 1641 if (!noSuchFamilies.isEmpty()) { 1642 throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies 1643 + " don't exist in table " + tableDescriptor.getTableName().getNameAsString()); 1644 } 1645 flushResult = 1646 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1647 } else { 1648 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1649 } 1650 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1651 if (compactionNeeded) { 1652 server.getCompactSplitThread().requestSystemCompaction(region, 1653 "Compaction through user triggered flush"); 1654 } 1655 builder.setFlushed(flushResult.isFlushSucceeded()); 1656 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1657 } 1658 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1659 return builder.build(); 1660 } catch (DroppedSnapshotException ex) { 1661 // Cache flush can fail in a few places. If it fails in a critical 1662 // section, we get a DroppedSnapshotException and a replay of wal 1663 // is required. Currently the only way to do this is a restart of 1664 // the server. 1665 server.abort("Replay of WAL required. Forcing server shutdown", ex); 1666 throw new ServiceException(ex); 1667 } catch (IOException ie) { 1668 throw new ServiceException(ie); 1669 } 1670 } 1671 1672 @Override 1673 @QosPriority(priority = HConstants.ADMIN_QOS) 1674 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1675 final GetOnlineRegionRequest request) throws ServiceException { 1676 try { 1677 checkOpen(); 1678 requestCount.increment(); 1679 Map<String, HRegion> onlineRegions = server.getOnlineRegions(); 1680 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1681 for (HRegion region : onlineRegions.values()) { 1682 list.add(region.getRegionInfo()); 1683 } 1684 list.sort(RegionInfo.COMPARATOR); 1685 return ResponseConverter.buildGetOnlineRegionResponse(list); 1686 } catch (IOException ie) { 1687 throw new ServiceException(ie); 1688 } 1689 } 1690 1691 // Master implementation of this Admin Service differs given it is not 1692 // able to supply detail only known to RegionServer. See note on 1693 // MasterRpcServers#getRegionInfo. 1694 @Override 1695 @QosPriority(priority = HConstants.ADMIN_QOS) 1696 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1697 final GetRegionInfoRequest request) throws ServiceException { 1698 try { 1699 checkOpen(); 1700 requestCount.increment(); 1701 HRegion region = getRegion(request.getRegion()); 1702 RegionInfo info = region.getRegionInfo(); 1703 byte[] bestSplitRow; 1704 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1705 bestSplitRow = region.checkSplit(true).orElse(null); 1706 // when all table data are in memstore, bestSplitRow = null 1707 // try to flush region first 1708 if (bestSplitRow == null) { 1709 region.flush(true); 1710 bestSplitRow = region.checkSplit(true).orElse(null); 1711 } 1712 } else { 1713 bestSplitRow = null; 1714 } 1715 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1716 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1717 if (request.hasCompactionState() && request.getCompactionState()) { 1718 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1719 } 1720 builder.setSplittable(region.isSplittable()); 1721 builder.setMergeable(region.isMergeable()); 1722 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1723 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1724 } 1725 return builder.build(); 1726 } catch (IOException ie) { 1727 throw new ServiceException(ie); 1728 } 1729 } 1730 1731 @Override 1732 @QosPriority(priority = HConstants.ADMIN_QOS) 1733 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1734 throws ServiceException { 1735 1736 List<HRegion> regions; 1737 if (request.hasTableName()) { 1738 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1739 regions = server.getRegions(tableName); 1740 } else { 1741 regions = server.getRegions(); 1742 } 1743 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1744 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1745 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1746 1747 try { 1748 for (HRegion region : regions) { 1749 rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1750 } 1751 } catch (IOException e) { 1752 throw new ServiceException(e); 1753 } 1754 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1755 builder.addAllRegionLoads(rLoads); 1756 return builder.build(); 1757 } 1758 1759 @Override 1760 @QosPriority(priority = HConstants.ADMIN_QOS) 1761 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1762 ClearCompactionQueuesRequest request) throws ServiceException { 1763 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1764 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1765 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1766 requestCount.increment(); 1767 if (clearCompactionQueues.compareAndSet(false, true)) { 1768 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 1769 try { 1770 checkOpen(); 1771 server.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1772 for (String queueName : request.getQueueNameList()) { 1773 LOG.debug("clear " + queueName + " compaction queue"); 1774 switch (queueName) { 1775 case "long": 1776 compactSplitThread.clearLongCompactionsQueue(); 1777 break; 1778 case "short": 1779 compactSplitThread.clearShortCompactionsQueue(); 1780 break; 1781 default: 1782 LOG.warn("Unknown queue name " + queueName); 1783 throw new IOException("Unknown queue name " + queueName); 1784 } 1785 } 1786 server.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1787 } catch (IOException ie) { 1788 throw new ServiceException(ie); 1789 } finally { 1790 clearCompactionQueues.set(false); 1791 } 1792 } else { 1793 LOG.warn("Clear compactions queue is executing by other admin."); 1794 } 1795 return respBuilder.build(); 1796 } 1797 1798 /** 1799 * Get some information of the region server. 1800 * @param controller the RPC controller 1801 * @param request the request 1802 */ 1803 @Override 1804 @QosPriority(priority = HConstants.ADMIN_QOS) 1805 public GetServerInfoResponse getServerInfo(final RpcController controller, 1806 final GetServerInfoRequest request) throws ServiceException { 1807 try { 1808 checkOpen(); 1809 } catch (IOException ie) { 1810 throw new ServiceException(ie); 1811 } 1812 requestCount.increment(); 1813 int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1; 1814 return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort); 1815 } 1816 1817 @Override 1818 @QosPriority(priority = HConstants.ADMIN_QOS) 1819 public GetStoreFileResponse getStoreFile(final RpcController controller, 1820 final GetStoreFileRequest request) throws ServiceException { 1821 try { 1822 checkOpen(); 1823 HRegion region = getRegion(request.getRegion()); 1824 requestCount.increment(); 1825 Set<byte[]> columnFamilies; 1826 if (request.getFamilyCount() == 0) { 1827 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1828 } else { 1829 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1830 for (ByteString cf : request.getFamilyList()) { 1831 columnFamilies.add(cf.toByteArray()); 1832 } 1833 } 1834 int nCF = columnFamilies.size(); 1835 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1836 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1837 builder.addAllStoreFile(fileList); 1838 return builder.build(); 1839 } catch (IOException ie) { 1840 throw new ServiceException(ie); 1841 } 1842 } 1843 1844 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1845 if (!request.hasServerStartCode()) { 1846 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1847 return; 1848 } 1849 throwOnWrongStartCode(request.getServerStartCode()); 1850 } 1851 1852 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1853 if (!request.hasServerStartCode()) { 1854 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1855 return; 1856 } 1857 throwOnWrongStartCode(request.getServerStartCode()); 1858 } 1859 1860 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1861 // check that we are the same server that this RPC is intended for. 1862 if (server.getServerName().getStartcode() != serverStartCode) { 1863 throw new ServiceException(new DoNotRetryIOException( 1864 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1865 + ", this server is: " + server.getServerName())); 1866 } 1867 } 1868 1869 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1870 if (req.getOpenRegionCount() > 0) { 1871 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1872 throwOnWrongStartCode(openReq); 1873 } 1874 } 1875 if (req.getCloseRegionCount() > 0) { 1876 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 1877 throwOnWrongStartCode(closeReq); 1878 } 1879 } 1880 } 1881 1882 /** 1883 * Open asynchronously a region or a set of regions on the region server. The opening is 1884 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 1885 * As a consequence, this method should be called only from the master. 1886 * <p> 1887 * Different manages states for the region are: 1888 * </p> 1889 * <ul> 1890 * <li>region not opened: the region opening will start asynchronously.</li> 1891 * <li>a close is already in progress: this is considered as an error.</li> 1892 * <li>an open is already in progress: this new open request will be ignored. This is important 1893 * because the Master can do multiple requests if it crashes.</li> 1894 * <li>the region is already opened: this new open request will be ignored.</li> 1895 * </ul> 1896 * <p> 1897 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1898 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1899 * errors are put in the response as FAILED_OPENING. 1900 * </p> 1901 * @param controller the RPC controller 1902 * @param request the request 1903 */ 1904 @Override 1905 @QosPriority(priority = HConstants.ADMIN_QOS) 1906 public OpenRegionResponse openRegion(final RpcController controller, 1907 final OpenRegionRequest request) throws ServiceException { 1908 requestCount.increment(); 1909 throwOnWrongStartCode(request); 1910 1911 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1912 final int regionCount = request.getOpenInfoCount(); 1913 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1914 final boolean isBulkAssign = regionCount > 1; 1915 try { 1916 checkOpen(); 1917 } catch (IOException ie) { 1918 TableName tableName = null; 1919 if (regionCount == 1) { 1920 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 1921 request.getOpenInfo(0).getRegion(); 1922 if (ri != null) { 1923 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1924 } 1925 } 1926 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1927 throw new ServiceException(ie); 1928 } 1929 // We are assigning meta, wait a little for regionserver to finish initialization. 1930 // Default to quarter of RPC timeout 1931 int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1932 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 1933 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 1934 synchronized (server.online) { 1935 try { 1936 while ( 1937 EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped() 1938 && !server.isOnline() 1939 ) { 1940 server.online.wait(server.getMsgInterval()); 1941 } 1942 checkOpen(); 1943 } catch (InterruptedException t) { 1944 Thread.currentThread().interrupt(); 1945 throw new ServiceException(t); 1946 } catch (IOException e) { 1947 throw new ServiceException(e); 1948 } 1949 } 1950 } 1951 1952 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1953 1954 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1955 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1956 TableDescriptor htd; 1957 try { 1958 String encodedName = region.getEncodedName(); 1959 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1960 final HRegion onlineRegion = server.getRegion(encodedName); 1961 if (onlineRegion != null) { 1962 // The region is already online. This should not happen any more. 1963 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1964 + ", which is already online"; 1965 LOG.warn(error); 1966 // server.abort(error); 1967 // throw new IOException(error); 1968 builder.addOpeningState(RegionOpeningState.OPENED); 1969 continue; 1970 } 1971 LOG.info("Open " + region.getRegionNameAsString()); 1972 1973 final Boolean previous = 1974 server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 1975 1976 if (Boolean.FALSE.equals(previous)) { 1977 if (server.getRegion(encodedName) != null) { 1978 // There is a close in progress. This should not happen any more. 1979 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 1980 + ", which we are already trying to CLOSE"; 1981 server.abort(error); 1982 throw new IOException(error); 1983 } 1984 server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 1985 } 1986 1987 if (Boolean.TRUE.equals(previous)) { 1988 // An open is in progress. This is supported, but let's log this. 1989 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 1990 + ", which we are already trying to OPEN" 1991 + " - ignoring this new request for this region."); 1992 } 1993 1994 // We are opening this region. If it moves back and forth for whatever reason, we don't 1995 // want to keep returning the stale moved record while we are opening/if we close again. 1996 server.removeFromMovedRegions(region.getEncodedName()); 1997 1998 if (previous == null || !previous.booleanValue()) { 1999 htd = htds.get(region.getTable()); 2000 if (htd == null) { 2001 htd = server.getTableDescriptors().get(region.getTable()); 2002 htds.put(region.getTable(), htd); 2003 } 2004 if (htd == null) { 2005 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2006 } 2007 // If there is no action in progress, we can submit a specific handler. 2008 // Need to pass the expected version in the constructor. 2009 if (server.getExecutorService() == null) { 2010 LOG.info("No executor executorService; skipping open request"); 2011 } else { 2012 if (region.isMetaRegion()) { 2013 server.getExecutorService() 2014 .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime)); 2015 } else { 2016 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2017 server.updateRegionFavoredNodesMapping(region.getEncodedName(), 2018 regionOpenInfo.getFavoredNodesList()); 2019 } 2020 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2021 server.getExecutorService().submit( 2022 new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime)); 2023 } else { 2024 server.getExecutorService() 2025 .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime)); 2026 } 2027 } 2028 } 2029 } 2030 2031 builder.addOpeningState(RegionOpeningState.OPENED); 2032 } catch (IOException ie) { 2033 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2034 if (isBulkAssign) { 2035 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2036 } else { 2037 throw new ServiceException(ie); 2038 } 2039 } 2040 } 2041 return builder.build(); 2042 } 2043 2044 /** 2045 * Warmup a region on this server. This method should only be called by Master. It synchronously 2046 * opens the region and closes the region bringing the most important pages in cache. 2047 */ 2048 @Override 2049 public WarmupRegionResponse warmupRegion(final RpcController controller, 2050 final WarmupRegionRequest request) throws ServiceException { 2051 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2052 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2053 try { 2054 checkOpen(); 2055 String encodedName = region.getEncodedName(); 2056 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2057 final HRegion onlineRegion = server.getRegion(encodedName); 2058 if (onlineRegion != null) { 2059 LOG.info("{} is online; skipping warmup", region); 2060 return response; 2061 } 2062 TableDescriptor htd = server.getTableDescriptors().get(region.getTable()); 2063 if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2064 LOG.info("{} is in transition; skipping warmup", region); 2065 return response; 2066 } 2067 LOG.info("Warmup {}", region.getRegionNameAsString()); 2068 HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server, 2069 null); 2070 } catch (IOException ie) { 2071 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2072 throw new ServiceException(ie); 2073 } 2074 2075 return response; 2076 } 2077 2078 private ExtendedCellScanner getAndReset(RpcController controller) { 2079 HBaseRpcController hrc = (HBaseRpcController) controller; 2080 ExtendedCellScanner cells = hrc.cellScanner(); 2081 hrc.setCellScanner(null); 2082 return cells; 2083 } 2084 2085 /** 2086 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2087 * that the given mutations will be durable on the receiving RS if this method returns without any 2088 * exception. 2089 * @param controller the RPC controller 2090 * @param request the request 2091 * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for 2092 * compatibility with old region replica implementation. Now we will use 2093 * {@code replicateToReplica} method instead. 2094 */ 2095 @Deprecated 2096 @Override 2097 @QosPriority(priority = HConstants.REPLAY_QOS) 2098 public ReplicateWALEntryResponse replay(final RpcController controller, 2099 final ReplicateWALEntryRequest request) throws ServiceException { 2100 long before = EnvironmentEdgeManager.currentTime(); 2101 ExtendedCellScanner cells = getAndReset(controller); 2102 try { 2103 checkOpen(); 2104 List<WALEntry> entries = request.getEntryList(); 2105 if (entries == null || entries.isEmpty()) { 2106 // empty input 2107 return ReplicateWALEntryResponse.newBuilder().build(); 2108 } 2109 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2110 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2111 RegionCoprocessorHost coprocessorHost = 2112 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2113 ? region.getCoprocessorHost() 2114 : null; // do not invoke coprocessors if this is a secondary region replica 2115 2116 // Skip adding the edits to WAL if this is a secondary region replica 2117 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2118 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2119 2120 for (WALEntry entry : entries) { 2121 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2122 throw new NotServingRegionException("Replay request contains entries from multiple " 2123 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2124 + entry.getKey().getEncodedRegionName()); 2125 } 2126 if (server.nonceManager != null && isPrimary) { 2127 long nonceGroup = 2128 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2129 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2130 server.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2131 entry.getKey().getWriteTime()); 2132 } 2133 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2134 List<MutationReplay> edits = 2135 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2136 if (edits != null && !edits.isEmpty()) { 2137 // HBASE-17924 2138 // sort to improve lock efficiency 2139 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2140 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2141 ? entry.getKey().getOrigSequenceNumber() 2142 : entry.getKey().getLogSequenceNumber(); 2143 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2144 // check if it's a partial success 2145 for (int i = 0; result != null && i < result.length; i++) { 2146 if (result[i] != OperationStatus.SUCCESS) { 2147 throw new IOException(result[i].getExceptionMsg()); 2148 } 2149 } 2150 } 2151 } 2152 2153 // sync wal at the end because ASYNC_WAL is used above 2154 WAL wal = region.getWAL(); 2155 if (wal != null) { 2156 wal.sync(); 2157 } 2158 return ReplicateWALEntryResponse.newBuilder().build(); 2159 } catch (IOException ie) { 2160 throw new ServiceException(ie); 2161 } finally { 2162 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2163 if (metricsRegionServer != null) { 2164 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2165 } 2166 } 2167 } 2168 2169 /** 2170 * Replay the given changes on a secondary replica 2171 */ 2172 @Override 2173 public ReplicateWALEntryResponse replicateToReplica(RpcController controller, 2174 ReplicateWALEntryRequest request) throws ServiceException { 2175 CellScanner cells = getAndReset(controller); 2176 try { 2177 checkOpen(); 2178 List<WALEntry> entries = request.getEntryList(); 2179 if (entries == null || entries.isEmpty()) { 2180 // empty input 2181 return ReplicateWALEntryResponse.newBuilder().build(); 2182 } 2183 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2184 HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); 2185 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2186 throw new DoNotRetryIOException( 2187 "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); 2188 } 2189 for (WALEntry entry : entries) { 2190 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2191 throw new NotServingRegionException( 2192 "ReplicateToReplica request contains entries from multiple " + "regions. First region:" 2193 + regionName.toStringUtf8() + " , other region:" 2194 + entry.getKey().getEncodedRegionName()); 2195 } 2196 region.replayWALEntry(entry, cells); 2197 } 2198 return ReplicateWALEntryResponse.newBuilder().build(); 2199 } catch (IOException ie) { 2200 throw new ServiceException(ie); 2201 } 2202 } 2203 2204 private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException { 2205 ReplicationSourceService replicationSource = server.getReplicationSourceService(); 2206 if (replicationSource == null || entries.isEmpty()) { 2207 return; 2208 } 2209 // We can ensure that all entries are for one peer, so only need to check one entry's 2210 // table name. if the table hit sync replication at peer side and the peer cluster 2211 // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply 2212 // those entries according to the design doc. 2213 TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray()); 2214 if ( 2215 replicationSource.getSyncReplicationPeerInfoProvider().checkState(table, 2216 RejectReplicationRequestStateChecker.get()) 2217 ) { 2218 throw new DoNotRetryIOException( 2219 "Reject to apply to sink cluster because sync replication state of sink cluster " 2220 + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table); 2221 } 2222 } 2223 2224 /** 2225 * Replicate WAL entries on the region server. 2226 * @param controller the RPC controller 2227 * @param request the request 2228 */ 2229 @Override 2230 @QosPriority(priority = HConstants.REPLICATION_QOS) 2231 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2232 final ReplicateWALEntryRequest request) throws ServiceException { 2233 try { 2234 checkOpen(); 2235 if (server.getReplicationSinkService() != null) { 2236 requestCount.increment(); 2237 List<WALEntry> entries = request.getEntryList(); 2238 checkShouldRejectReplicationRequest(entries); 2239 ExtendedCellScanner cellScanner = getAndReset(controller); 2240 server.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2241 server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2242 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2243 request.getSourceHFileArchiveDirPath()); 2244 server.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2245 return ReplicateWALEntryResponse.newBuilder().build(); 2246 } else { 2247 throw new ServiceException("Replication services are not initialized yet"); 2248 } 2249 } catch (IOException ie) { 2250 throw new ServiceException(ie); 2251 } 2252 } 2253 2254 /** 2255 * Roll the WAL writer of the region server. 2256 * @param controller the RPC controller 2257 * @param request the request 2258 */ 2259 @Override 2260 @QosPriority(priority = HConstants.ADMIN_QOS) 2261 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2262 final RollWALWriterRequest request) throws ServiceException { 2263 try { 2264 checkOpen(); 2265 requestCount.increment(); 2266 server.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2267 server.getWalRoller().requestRollAll(); 2268 server.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2269 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2270 return builder.build(); 2271 } catch (IOException ie) { 2272 throw new ServiceException(ie); 2273 } 2274 } 2275 2276 /** 2277 * Stop the region server. 2278 * @param controller the RPC controller 2279 * @param request the request 2280 */ 2281 @Override 2282 @QosPriority(priority = HConstants.ADMIN_QOS) 2283 public StopServerResponse stopServer(final RpcController controller, 2284 final StopServerRequest request) throws ServiceException { 2285 rpcPreCheck("stopServer"); 2286 requestCount.increment(); 2287 String reason = request.getReason(); 2288 server.stop(reason); 2289 return StopServerResponse.newBuilder().build(); 2290 } 2291 2292 @Override 2293 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2294 UpdateFavoredNodesRequest request) throws ServiceException { 2295 rpcPreCheck("updateFavoredNodes"); 2296 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2297 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2298 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2299 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2300 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2301 server.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2302 regionUpdateInfo.getFavoredNodesList()); 2303 } 2304 } 2305 respBuilder.setResponse(openInfoList.size()); 2306 return respBuilder.build(); 2307 } 2308 2309 /** 2310 * Atomically bulk load several HFiles into an open region 2311 * @return true if successful, false is failed but recoverably (no action) 2312 * @throws ServiceException if failed unrecoverably 2313 */ 2314 @Override 2315 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2316 final BulkLoadHFileRequest request) throws ServiceException { 2317 long start = EnvironmentEdgeManager.currentTime(); 2318 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2319 if (clusterIds.contains(this.server.getClusterId())) { 2320 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2321 } else { 2322 clusterIds.add(this.server.getClusterId()); 2323 } 2324 try { 2325 checkOpen(); 2326 requestCount.increment(); 2327 HRegion region = getRegion(request.getRegion()); 2328 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2329 long sizeToBeLoaded = -1; 2330 2331 // Check to see if this bulk load would exceed the space quota for this table 2332 if (spaceQuotaEnabled) { 2333 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2334 SpaceViolationPolicyEnforcement enforcement = 2335 activeSpaceQuotas.getPolicyEnforcement(region); 2336 if (enforcement != null) { 2337 // Bulk loads must still be atomic. We must enact all or none. 2338 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2339 for (FamilyPath familyPath : request.getFamilyPathList()) { 2340 filePaths.add(familyPath.getPath()); 2341 } 2342 // Check if the batch of files exceeds the current quota 2343 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2344 } 2345 } 2346 // secure bulk load 2347 Map<byte[], List<Path>> map = 2348 server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2349 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2350 builder.setLoaded(map != null); 2351 if (map != null) { 2352 // Treat any negative size as a flag to "ignore" updating the region size as that is 2353 // not possible to occur in real life (cannot bulk load a file with negative size) 2354 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2355 if (LOG.isTraceEnabled()) { 2356 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2357 + sizeToBeLoaded + " bytes"); 2358 } 2359 // Inform space quotas of the new files for this region 2360 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2361 sizeToBeLoaded); 2362 } 2363 } 2364 return builder.build(); 2365 } catch (IOException ie) { 2366 throw new ServiceException(ie); 2367 } finally { 2368 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2369 if (metricsRegionServer != null) { 2370 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2371 } 2372 } 2373 } 2374 2375 @Override 2376 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2377 PrepareBulkLoadRequest request) throws ServiceException { 2378 try { 2379 checkOpen(); 2380 requestCount.increment(); 2381 2382 HRegion region = getRegion(request.getRegion()); 2383 2384 String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2385 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2386 builder.setBulkToken(bulkToken); 2387 return builder.build(); 2388 } catch (IOException ie) { 2389 throw new ServiceException(ie); 2390 } 2391 } 2392 2393 @Override 2394 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2395 CleanupBulkLoadRequest request) throws ServiceException { 2396 try { 2397 checkOpen(); 2398 requestCount.increment(); 2399 2400 HRegion region = getRegion(request.getRegion()); 2401 2402 server.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2403 return CleanupBulkLoadResponse.newBuilder().build(); 2404 } catch (IOException ie) { 2405 throw new ServiceException(ie); 2406 } 2407 } 2408 2409 @Override 2410 public CoprocessorServiceResponse execService(final RpcController controller, 2411 final CoprocessorServiceRequest request) throws ServiceException { 2412 try { 2413 checkOpen(); 2414 requestCount.increment(); 2415 HRegion region = getRegion(request.getRegion()); 2416 Message result = execServiceOnRegion(region, request.getCall()); 2417 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2418 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2419 region.getRegionInfo().getRegionName())); 2420 // TODO: COPIES!!!!!! 2421 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2422 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2423 return builder.build(); 2424 } catch (IOException ie) { 2425 throw new ServiceException(ie); 2426 } 2427 } 2428 2429 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2430 if (filePaths.isEmpty()) { 2431 // local hdfs 2432 return server.getFileSystem(); 2433 } 2434 // source hdfs 2435 return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration()); 2436 } 2437 2438 private Message execServiceOnRegion(HRegion region, 2439 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2440 // ignore the passed in controller (from the serialized call) 2441 ServerRpcController execController = new ServerRpcController(); 2442 return region.execService(execController, serviceCall); 2443 } 2444 2445 private boolean shouldRejectRequestsFromClient(HRegion region) { 2446 TableName table = region.getRegionInfo().getTable(); 2447 ReplicationSourceService service = server.getReplicationSourceService(); 2448 return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table, 2449 RejectRequestsFromClientStateChecker.get()); 2450 } 2451 2452 private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { 2453 if (shouldRejectRequestsFromClient(region)) { 2454 throw new DoNotRetryIOException( 2455 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); 2456 } 2457 } 2458 2459 /** 2460 * Get data from a table. 2461 * @param controller the RPC controller 2462 * @param request the get request 2463 */ 2464 @Override 2465 public GetResponse get(final RpcController controller, final GetRequest request) 2466 throws ServiceException { 2467 long before = EnvironmentEdgeManager.currentTime(); 2468 OperationQuota quota = null; 2469 HRegion region = null; 2470 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2471 try { 2472 checkOpen(); 2473 requestCount.increment(); 2474 rpcGetRequestCount.increment(); 2475 region = getRegion(request.getRegion()); 2476 rejectIfInStandByState(region); 2477 2478 GetResponse.Builder builder = GetResponse.newBuilder(); 2479 ClientProtos.Get get = request.getGet(); 2480 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2481 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2482 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2483 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2484 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2485 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2486 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2487 + "reverse Scan."); 2488 } 2489 Boolean existence = null; 2490 Result r = null; 2491 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2492 2493 Get clientGet = ProtobufUtil.toGet(get); 2494 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2495 existence = region.getCoprocessorHost().preExists(clientGet); 2496 } 2497 if (existence == null) { 2498 if (context != null) { 2499 r = get(clientGet, (region), null, context); 2500 } else { 2501 // for test purpose 2502 r = region.get(clientGet); 2503 } 2504 if (get.getExistenceOnly()) { 2505 boolean exists = r.getExists(); 2506 if (region.getCoprocessorHost() != null) { 2507 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2508 } 2509 existence = exists; 2510 } 2511 } 2512 if (existence != null) { 2513 ClientProtos.Result pbr = 2514 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2515 builder.setResult(pbr); 2516 } else if (r != null) { 2517 ClientProtos.Result pbr; 2518 if ( 2519 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2520 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2521 ) { 2522 pbr = ProtobufUtil.toResultNoData(r); 2523 ((HBaseRpcController) controller).setCellScanner( 2524 PrivateCellUtil.createExtendedCellScanner(ClientInternalHelper.getExtendedRawCells(r))); 2525 addSize(context, r); 2526 } else { 2527 pbr = ProtobufUtil.toResult(r); 2528 } 2529 builder.setResult(pbr); 2530 } 2531 // r.cells is null when an table.exists(get) call 2532 if (r != null && r.rawCells() != null) { 2533 quota.addGetResult(r); 2534 } 2535 return builder.build(); 2536 } catch (IOException ie) { 2537 throw new ServiceException(ie); 2538 } finally { 2539 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 2540 if (metricsRegionServer != null && region != null) { 2541 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2542 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2543 blockBytesScanned); 2544 } 2545 if (quota != null) { 2546 quota.close(); 2547 } 2548 } 2549 } 2550 2551 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2552 RpcCallContext context) throws IOException { 2553 region.prepareGet(get); 2554 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2555 2556 // This method is almost the same as HRegion#get. 2557 List<Cell> results = new ArrayList<>(); 2558 // pre-get CP hook 2559 if (region.getCoprocessorHost() != null) { 2560 if (region.getCoprocessorHost().preGet(get, results)) { 2561 region.metricsUpdateForGet(); 2562 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2563 stale); 2564 } 2565 } 2566 Scan scan = new Scan(get); 2567 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2568 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2569 } 2570 RegionScannerImpl scanner = null; 2571 long blockBytesScannedBefore = context.getBlockBytesScanned(); 2572 try { 2573 scanner = region.getScanner(scan); 2574 scanner.next(results); 2575 } finally { 2576 if (scanner != null) { 2577 if (closeCallBack == null) { 2578 // If there is a context then the scanner can be added to the current 2579 // RpcCallContext. The rpc callback will take care of closing the 2580 // scanner, for eg in case 2581 // of get() 2582 context.setCallBack(scanner); 2583 } else { 2584 // The call is from multi() where the results from the get() are 2585 // aggregated and then send out to the 2586 // rpc. The rpccall back will close all such scanners created as part 2587 // of multi(). 2588 closeCallBack.addScanner(scanner); 2589 } 2590 } 2591 } 2592 2593 // post-get CP hook 2594 if (region.getCoprocessorHost() != null) { 2595 region.getCoprocessorHost().postGet(get, results); 2596 } 2597 region.metricsUpdateForGet(); 2598 2599 Result r = 2600 Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2601 if (get.isQueryMetricsEnabled()) { 2602 long blockBytesScanned = context.getBlockBytesScanned() - blockBytesScannedBefore; 2603 r.setMetrics(new QueryMetrics(blockBytesScanned)); 2604 } 2605 return r; 2606 } 2607 2608 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2609 int sum = 0; 2610 String firstRegionName = null; 2611 for (RegionAction regionAction : request.getRegionActionList()) { 2612 if (sum == 0) { 2613 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2614 } 2615 sum += regionAction.getActionCount(); 2616 } 2617 if (sum > rowSizeWarnThreshold) { 2618 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2619 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2620 + RpcServer.getRequestUserName().orElse(null) + "/" 2621 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2622 if (rejectRowsWithSizeOverThreshold) { 2623 throw new ServiceException( 2624 "Rejecting large batch operation for current batch with firstRegionName: " 2625 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2626 + rowSizeWarnThreshold); 2627 } 2628 } 2629 } 2630 2631 private void failRegionAction(MultiResponse.Builder responseBuilder, 2632 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2633 CellScanner cellScanner, Throwable error) { 2634 rpcServer.getMetrics().exception(error); 2635 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2636 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2637 // All Mutations in this RegionAction not executed as we can not see the Region online here 2638 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2639 // corresponding to these Mutations. 2640 if (cellScanner != null) { 2641 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2642 } 2643 } 2644 2645 private boolean isReplicationRequest(Action action) { 2646 // replication request can only be put or delete. 2647 if (!action.hasMutation()) { 2648 return false; 2649 } 2650 MutationProto mutation = action.getMutation(); 2651 MutationType type = mutation.getMutateType(); 2652 if (type != MutationType.PUT && type != MutationType.DELETE) { 2653 return false; 2654 } 2655 // replication will set a special attribute so we can make use of it to decide whether a request 2656 // is for replication. 2657 return mutation.getAttributeList().stream().map(p -> p.getName()) 2658 .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); 2659 } 2660 2661 /** 2662 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2663 * @param rpcc the RPC controller 2664 * @param request the multi request 2665 */ 2666 @Override 2667 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2668 throws ServiceException { 2669 try { 2670 checkOpen(); 2671 } catch (IOException ie) { 2672 throw new ServiceException(ie); 2673 } 2674 2675 checkBatchSizeAndLogLargeSize(request); 2676 2677 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2678 // It is also the conduit via which we pass back data. 2679 HBaseRpcController controller = (HBaseRpcController) rpcc; 2680 CellScanner cellScanner = controller != null ? getAndReset(controller) : null; 2681 2682 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2683 2684 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2685 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2686 this.rpcMultiRequestCount.increment(); 2687 this.requestCount.increment(); 2688 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2689 2690 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2691 // following logic is for backward compatibility as old clients still use 2692 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2693 if (request.hasCondition()) { 2694 if (request.getRegionActionList().isEmpty()) { 2695 // If the region action list is empty, do nothing. 2696 responseBuilder.setProcessed(true); 2697 return responseBuilder.build(); 2698 } 2699 2700 RegionAction regionAction = request.getRegionAction(0); 2701 2702 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2703 // we can assume regionAction.getAtomic() is true here. 2704 assert regionAction.getAtomic(); 2705 2706 OperationQuota quota; 2707 HRegion region; 2708 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2709 2710 try { 2711 region = getRegion(regionSpecifier); 2712 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2713 regionAction.hasCondition()); 2714 } catch (IOException e) { 2715 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2716 return responseBuilder.build(); 2717 } 2718 2719 try { 2720 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2721 // We only allow replication in standby state and it will not set the atomic flag. 2722 if (rejectIfFromClient) { 2723 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2724 new DoNotRetryIOException( 2725 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2726 return responseBuilder.build(); 2727 } 2728 2729 try { 2730 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2731 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2732 responseBuilder.setProcessed(result.isSuccess()); 2733 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2734 ClientProtos.ResultOrException.newBuilder(); 2735 for (int i = 0; i < regionAction.getActionCount(); i++) { 2736 // To unify the response format with doNonAtomicRegionMutation and read through 2737 // client's AsyncProcess we have to add an empty result instance per operation 2738 resultOrExceptionOrBuilder.clear(); 2739 resultOrExceptionOrBuilder.setIndex(i); 2740 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2741 } 2742 } catch (IOException e) { 2743 rpcServer.getMetrics().exception(e); 2744 // As it's an atomic operation with a condition, we may expect it's a global failure. 2745 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2746 } 2747 } finally { 2748 quota.close(); 2749 } 2750 2751 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2752 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2753 if (regionLoadStats != null) { 2754 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2755 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2756 } 2757 return responseBuilder.build(); 2758 } 2759 2760 // this will contain all the cells that we need to return. It's created later, if needed. 2761 List<ExtendedCellScannable> cellsToReturn = null; 2762 RegionScannersCloseCallBack closeCallBack = null; 2763 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2764 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2765 new HashMap<>(request.getRegionActionCount()); 2766 2767 for (RegionAction regionAction : request.getRegionActionList()) { 2768 OperationQuota quota; 2769 HRegion region; 2770 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2771 regionActionResultBuilder.clear(); 2772 2773 try { 2774 region = getRegion(regionSpecifier); 2775 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2776 regionAction.hasCondition()); 2777 } catch (IOException e) { 2778 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2779 continue; // For this region it's a failure. 2780 } 2781 2782 try { 2783 boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); 2784 2785 if (regionAction.hasCondition()) { 2786 // We only allow replication in standby state and it will not set the atomic flag. 2787 if (rejectIfFromClient) { 2788 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2789 new DoNotRetryIOException( 2790 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2791 continue; 2792 } 2793 2794 try { 2795 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2796 ClientProtos.ResultOrException.newBuilder(); 2797 if (regionAction.getActionCount() == 1) { 2798 CheckAndMutateResult result = 2799 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2800 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2801 regionActionResultBuilder.setProcessed(result.isSuccess()); 2802 resultOrExceptionOrBuilder.setIndex(0); 2803 if (result.getResult() != null) { 2804 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2805 } 2806 2807 if (result.getMetrics() != null) { 2808 resultOrExceptionOrBuilder 2809 .setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); 2810 } 2811 2812 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2813 } else { 2814 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2815 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2816 regionActionResultBuilder.setProcessed(result.isSuccess()); 2817 for (int i = 0; i < regionAction.getActionCount(); i++) { 2818 if (i == 0 && result.getResult() != null) { 2819 // Set the result of the Increment/Append operations to the first element of the 2820 // ResultOrException list 2821 resultOrExceptionOrBuilder.setIndex(i); 2822 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2823 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2824 continue; 2825 } 2826 // To unify the response format with doNonAtomicRegionMutation and read through 2827 // client's AsyncProcess we have to add an empty result instance per operation 2828 resultOrExceptionOrBuilder.clear(); 2829 resultOrExceptionOrBuilder.setIndex(i); 2830 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2831 } 2832 } 2833 } catch (IOException e) { 2834 rpcServer.getMetrics().exception(e); 2835 // As it's an atomic operation with a condition, we may expect it's a global failure. 2836 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2837 } 2838 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2839 // We only allow replication in standby state and it will not set the atomic flag. 2840 if (rejectIfFromClient) { 2841 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2842 new DoNotRetryIOException( 2843 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2844 continue; 2845 } 2846 try { 2847 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2848 cellScanner, nonceGroup, spaceQuotaEnforcement); 2849 regionActionResultBuilder.setProcessed(true); 2850 // We no longer use MultiResponse#processed. Instead, we use 2851 // RegionActionResult#processed. This is for backward compatibility for old clients. 2852 responseBuilder.setProcessed(true); 2853 } catch (IOException e) { 2854 rpcServer.getMetrics().exception(e); 2855 // As it's atomic, we may expect it's a global failure. 2856 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2857 } 2858 } else { 2859 if ( 2860 rejectIfFromClient && regionAction.getActionCount() > 0 2861 && !isReplicationRequest(regionAction.getAction(0)) 2862 ) { 2863 // fail if it is not a replication request 2864 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, 2865 new DoNotRetryIOException( 2866 region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); 2867 continue; 2868 } 2869 // doNonAtomicRegionMutation manages the exception internally 2870 if (context != null && closeCallBack == null) { 2871 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2872 // operation on completion of multiGets. 2873 // Set this only once 2874 closeCallBack = new RegionScannersCloseCallBack(); 2875 context.setCallBack(closeCallBack); 2876 } 2877 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2878 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2879 spaceQuotaEnforcement); 2880 } 2881 } finally { 2882 quota.close(); 2883 } 2884 2885 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2886 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2887 if (regionLoadStats != null) { 2888 regionStats.put(regionSpecifier, regionLoadStats); 2889 } 2890 } 2891 // Load the controller with the Cells to return. 2892 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2893 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn)); 2894 } 2895 2896 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2897 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2898 builder.addRegion(stat.getKey()); 2899 builder.addStat(stat.getValue()); 2900 } 2901 responseBuilder.setRegionStatistics(builder); 2902 return responseBuilder.build(); 2903 } 2904 2905 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2906 if (cellScanner == null) { 2907 return; 2908 } 2909 for (Action action : actions) { 2910 skipCellsForMutation(action, cellScanner); 2911 } 2912 } 2913 2914 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2915 if (cellScanner == null) { 2916 return; 2917 } 2918 try { 2919 if (action.hasMutation()) { 2920 MutationProto m = action.getMutation(); 2921 if (m.hasAssociatedCellCount()) { 2922 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2923 cellScanner.advance(); 2924 } 2925 } 2926 } 2927 } catch (IOException e) { 2928 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2929 // marked as failed as we could not see the Region here. At client side the top level 2930 // RegionAction exception will be considered first. 2931 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2932 } 2933 } 2934 2935 /** 2936 * Mutate data in a table. 2937 * @param rpcc the RPC controller 2938 * @param request the mutate request 2939 */ 2940 @Override 2941 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2942 throws ServiceException { 2943 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2944 // It is also the conduit via which we pass back data. 2945 HBaseRpcController controller = (HBaseRpcController) rpcc; 2946 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2947 OperationQuota quota = null; 2948 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2949 // Clear scanner so we are not holding on to reference across call. 2950 if (controller != null) { 2951 controller.setCellScanner(null); 2952 } 2953 try { 2954 checkOpen(); 2955 requestCount.increment(); 2956 rpcMutateRequestCount.increment(); 2957 HRegion region = getRegion(request.getRegion()); 2958 rejectIfInStandByState(region); 2959 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2960 MutationProto mutation = request.getMutation(); 2961 if (!region.getRegionInfo().isMetaRegion()) { 2962 server.getMemStoreFlusher().reclaimMemStoreMemory(); 2963 } 2964 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2965 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2966 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2967 ActivePolicyEnforcement spaceQuotaEnforcement = 2968 getSpaceQuotaManager().getActiveEnforcements(); 2969 2970 if (request.hasCondition()) { 2971 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2972 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2973 builder.setProcessed(result.isSuccess()); 2974 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2975 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2976 if (clientCellBlockSupported) { 2977 addSize(context, result.getResult()); 2978 } 2979 if (result.getMetrics() != null) { 2980 builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); 2981 } 2982 } else { 2983 Result r = null; 2984 Boolean processed = null; 2985 MutationType type = mutation.getMutateType(); 2986 switch (type) { 2987 case APPEND: 2988 // TODO: this doesn't actually check anything. 2989 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2990 context); 2991 break; 2992 case INCREMENT: 2993 // TODO: this doesn't actually check anything. 2994 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 2995 context); 2996 break; 2997 case PUT: 2998 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 2999 processed = Boolean.TRUE; 3000 break; 3001 case DELETE: 3002 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3003 processed = Boolean.TRUE; 3004 break; 3005 default: 3006 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 3007 } 3008 if (processed != null) { 3009 builder.setProcessed(processed); 3010 } 3011 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 3012 addResult(builder, r, controller, clientCellBlockSupported); 3013 if (clientCellBlockSupported) { 3014 addSize(context, r); 3015 } 3016 } 3017 return builder.build(); 3018 } catch (IOException ie) { 3019 server.checkFileSystem(); 3020 throw new ServiceException(ie); 3021 } finally { 3022 if (quota != null) { 3023 quota.close(); 3024 } 3025 } 3026 } 3027 3028 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3029 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3030 long before = EnvironmentEdgeManager.currentTime(); 3031 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3032 checkCellSizeLimit(region, put); 3033 spaceQuota.getPolicyEnforcement(region).check(put); 3034 quota.addMutation(put); 3035 region.put(put); 3036 3037 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3038 if (metricsRegionServer != null) { 3039 long after = EnvironmentEdgeManager.currentTime(); 3040 metricsRegionServer.updatePut(region, after - before); 3041 } 3042 } 3043 3044 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3045 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3046 long before = EnvironmentEdgeManager.currentTime(); 3047 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3048 checkCellSizeLimit(region, delete); 3049 spaceQuota.getPolicyEnforcement(region).check(delete); 3050 quota.addMutation(delete); 3051 region.delete(delete); 3052 3053 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3054 if (metricsRegionServer != null) { 3055 long after = EnvironmentEdgeManager.currentTime(); 3056 metricsRegionServer.updateDelete(region, after - before); 3057 } 3058 } 3059 3060 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3061 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3062 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3063 long before = EnvironmentEdgeManager.currentTime(); 3064 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3065 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3066 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3067 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3068 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3069 quota.addMutation((Mutation) checkAndMutate.getAction()); 3070 3071 CheckAndMutateResult result = null; 3072 if (region.getCoprocessorHost() != null) { 3073 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3074 } 3075 if (result == null) { 3076 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3077 if (region.getCoprocessorHost() != null) { 3078 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3079 } 3080 } 3081 MetricsRegionServer metricsRegionServer = server.getMetrics(); 3082 if (metricsRegionServer != null) { 3083 long after = EnvironmentEdgeManager.currentTime(); 3084 long blockBytesScanned = 3085 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3086 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3087 3088 MutationType type = mutation.getMutateType(); 3089 switch (type) { 3090 case PUT: 3091 metricsRegionServer.updateCheckAndPut(region, after - before); 3092 break; 3093 case DELETE: 3094 metricsRegionServer.updateCheckAndDelete(region, after - before); 3095 break; 3096 default: 3097 break; 3098 } 3099 } 3100 return result; 3101 } 3102 3103 // This is used to keep compatible with the old client implementation. Consider remove it if we 3104 // decide to drop the support of the client that still sends close request to a region scanner 3105 // which has already been exhausted. 3106 @Deprecated 3107 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3108 3109 private static final long serialVersionUID = -4305297078988180130L; 3110 3111 @Override 3112 public synchronized Throwable fillInStackTrace() { 3113 return this; 3114 } 3115 }; 3116 3117 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3118 String scannerName = toScannerName(request.getScannerId()); 3119 RegionScannerHolder rsh = this.scanners.get(scannerName); 3120 if (rsh == null) { 3121 // just ignore the next or close request if scanner does not exists. 3122 Long lastCallSeq = closedScanners.getIfPresent(scannerName); 3123 if (lastCallSeq != null) { 3124 // Check the sequence number to catch if the last call was incorrectly retried. 3125 // The only allowed scenario is when the scanner is exhausted and one more scan 3126 // request arrives - in this case returning 0 rows is correct. 3127 if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) { 3128 throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: " 3129 + (lastCallSeq + 1) + " But the nextCallSeq got from client: " 3130 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3131 } else { 3132 throw SCANNER_ALREADY_CLOSED; 3133 } 3134 } else { 3135 LOG.warn("Client tried to access missing scanner " + scannerName); 3136 throw new UnknownScannerException( 3137 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3138 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3139 + "long wait between consecutive client checkins, c) Server may be closing down, " 3140 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3141 + "possible fix would be increasing the value of" 3142 + "'hbase.client.scanner.timeout.period' configuration."); 3143 } 3144 } 3145 rejectIfInStandByState(rsh.r); 3146 RegionInfo hri = rsh.s.getRegionInfo(); 3147 // Yes, should be the same instance 3148 if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3149 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3150 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3151 LOG.warn(msg + ", closing..."); 3152 scanners.remove(scannerName); 3153 try { 3154 rsh.s.close(); 3155 } catch (IOException e) { 3156 LOG.warn("Getting exception closing " + scannerName, e); 3157 } finally { 3158 try { 3159 server.getLeaseManager().cancelLease(scannerName); 3160 } catch (LeaseException e) { 3161 LOG.warn("Getting exception closing " + scannerName, e); 3162 } 3163 } 3164 throw new NotServingRegionException(msg); 3165 } 3166 return rsh; 3167 } 3168 3169 /** 3170 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3171 * value. 3172 */ 3173 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region, 3174 ScanResponse.Builder builder) throws IOException { 3175 rejectIfInStandByState(region); 3176 ClientProtos.Scan protoScan = request.getScan(); 3177 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3178 Scan scan = ProtobufUtil.toScan(protoScan); 3179 // if the request doesn't set this, get the default region setting. 3180 if (!isLoadingCfsOnDemandSet) { 3181 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3182 } 3183 3184 if (!scan.hasFamilies()) { 3185 // Adding all families to scanner 3186 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3187 scan.addFamily(family); 3188 } 3189 } 3190 if (region.getCoprocessorHost() != null) { 3191 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3192 // wrapper for the core created RegionScanner 3193 region.getCoprocessorHost().preScannerOpen(scan); 3194 } 3195 RegionScannerImpl coreScanner = region.getScanner(scan); 3196 Shipper shipper = coreScanner; 3197 RegionScanner scanner = coreScanner; 3198 try { 3199 if (region.getCoprocessorHost() != null) { 3200 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3201 } 3202 } catch (Exception e) { 3203 // Although region coprocessor is for advanced users and they should take care of the 3204 // implementation to not damage the HBase system, closing the scanner on exception here does 3205 // not have any bad side effect, so let's do it 3206 scanner.close(); 3207 throw e; 3208 } 3209 long scannerId = scannerIdGenerator.generateNewScannerId(); 3210 builder.setScannerId(scannerId); 3211 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3212 builder.setTtl(scannerLeaseTimeoutPeriod); 3213 String scannerName = toScannerName(scannerId); 3214 3215 boolean fullRegionScan = 3216 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3217 3218 return new Pair<String, RegionScannerHolder>(scannerName, 3219 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3220 } 3221 3222 /** 3223 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3224 * this.scanners, the Map of outstanding scanners and their current state. 3225 * @param scannerId A scanner long id. 3226 * @return The long id as a String. 3227 */ 3228 private static String toScannerName(long scannerId) { 3229 return Long.toString(scannerId); 3230 } 3231 3232 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3233 throws OutOfOrderScannerNextException { 3234 // if nextCallSeq does not match throw Exception straight away. This needs to be 3235 // performed even before checking of Lease. 3236 // See HBASE-5974 3237 if (request.hasNextCallSeq()) { 3238 long callSeq = request.getNextCallSeq(); 3239 if (!rsh.incNextCallSeq(callSeq)) { 3240 throw new OutOfOrderScannerNextException( 3241 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3242 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3243 } 3244 } 3245 } 3246 3247 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3248 try { 3249 server.getLeaseManager().addLease(lease); 3250 } catch (LeaseStillHeldException e) { 3251 // should not happen as the scanner id is unique. 3252 throw new AssertionError(e); 3253 } 3254 } 3255 3256 // visible for testing only 3257 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3258 boolean allowHeartbeatMessages) { 3259 // Set the time limit to be half of the more restrictive timeout value (one of the 3260 // timeout values must be positive). In the event that both values are positive, the 3261 // more restrictive of the two is used to calculate the limit. 3262 if (allowHeartbeatMessages) { 3263 long now = EnvironmentEdgeManager.currentTime(); 3264 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3265 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3266 long timeLimitDelta; 3267 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3268 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3269 } else { 3270 timeLimitDelta = 3271 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3272 } 3273 3274 // Use half of whichever timeout value was more restrictive... But don't allow 3275 // the time limit to be less than the allowable minimum (could cause an 3276 // immediate timeout before scanning any data). 3277 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3278 return now + timeLimitDelta; 3279 } 3280 } 3281 // Default value of timeLimit is negative to indicate no timeLimit should be 3282 // enforced. 3283 return -1L; 3284 } 3285 3286 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3287 long timeout; 3288 if (controller != null && controller.getCallTimeout() > 0) { 3289 timeout = controller.getCallTimeout(); 3290 } else if (rpcTimeout > 0) { 3291 timeout = rpcTimeout; 3292 } else { 3293 return -1; 3294 } 3295 if (call != null) { 3296 timeout -= (now - call.getReceiveTime()); 3297 } 3298 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3299 // return minimum value here in that case so we count this in calculating the final delta. 3300 return Math.max(minimumScanTimeLimitDelta, timeout); 3301 } 3302 3303 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3304 ScannerContext scannerContext, ScanResponse.Builder builder) { 3305 if (numOfCompleteRows >= limitOfRows) { 3306 if (LOG.isTraceEnabled()) { 3307 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3308 + " scannerContext: " + scannerContext); 3309 } 3310 builder.setMoreResults(false); 3311 } 3312 } 3313 3314 // return whether we have more results in region. 3315 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3316 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3317 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3318 HRegion region = rsh.r; 3319 RegionScanner scanner = rsh.s; 3320 long maxResultSize; 3321 if (scanner.getMaxResultSize() > 0) { 3322 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3323 } else { 3324 maxResultSize = maxQuotaResultSize; 3325 } 3326 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3327 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3328 // arbitrary 32. TODO: keep record of general size of results being returned. 3329 ArrayList<Cell> values = new ArrayList<>(32); 3330 region.startRegionOperation(Operation.SCAN); 3331 long before = EnvironmentEdgeManager.currentTime(); 3332 // Used to check if we've matched the row limit set on the Scan 3333 int numOfCompleteRows = 0; 3334 // Count of times we call nextRaw; can be > numOfCompleteRows. 3335 int numOfNextRawCalls = 0; 3336 try { 3337 int numOfResults = 0; 3338 synchronized (scanner) { 3339 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3340 boolean clientHandlesPartials = 3341 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3342 boolean clientHandlesHeartbeats = 3343 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3344 3345 // On the server side we must ensure that the correct ordering of partial results is 3346 // returned to the client to allow them to properly reconstruct the partial results. 3347 // If the coprocessor host is adding to the result list, we cannot guarantee the 3348 // correct ordering of partial results and so we prevent partial results from being 3349 // formed. 3350 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3351 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3352 boolean moreRows = false; 3353 3354 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3355 // certain time threshold on the server. When the time threshold is exceeded, the 3356 // server stops the scan and sends back whatever Results it has accumulated within 3357 // that time period (may be empty). Since heartbeat messages have the potential to 3358 // create partial Results (in the event that the timeout occurs in the middle of a 3359 // row), we must only generate heartbeat messages when the client can handle both 3360 // heartbeats AND partials 3361 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3362 3363 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3364 3365 final LimitScope sizeScope = 3366 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3367 final LimitScope timeScope = 3368 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3369 3370 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3371 3372 // Configure with limits for this RPC. Set keep progress true since size progress 3373 // towards size limit should be kept between calls to nextRaw 3374 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3375 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3376 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3377 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3378 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3379 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3380 // cell or block size from maximum here so we adhere to total limits of request. 3381 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3382 // have accumulated block bytes. If not, this will be 0 for block size. 3383 long maxCellSize = maxResultSize; 3384 long maxBlockSize = maxQuotaResultSize; 3385 if (rpcCall != null) { 3386 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3387 maxCellSize -= rpcCall.getResponseCellSize(); 3388 } 3389 3390 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3391 contextBuilder.setBatchLimit(scanner.getBatch()); 3392 contextBuilder.setTimeLimit(timeScope, timeLimit); 3393 contextBuilder.setTrackMetrics(trackMetrics); 3394 ScannerContext scannerContext = contextBuilder.build(); 3395 boolean limitReached = false; 3396 long blockBytesScannedBefore = 0; 3397 while (numOfResults < maxResults) { 3398 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3399 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3400 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3401 // reset the batch progress between nextRaw invocations since we don't want the 3402 // batch progress from previous calls to affect future calls 3403 scannerContext.setBatchProgress(0); 3404 assert values.isEmpty(); 3405 3406 // Collect values to be returned here 3407 moreRows = scanner.nextRaw(values, scannerContext); 3408 3409 long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore; 3410 blockBytesScannedBefore = scannerContext.getBlockSizeProgress(); 3411 3412 if (rpcCall == null) { 3413 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3414 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3415 // buffers.See more details in HBASE-26036. 3416 CellUtil.cloneIfNecessary(values); 3417 } 3418 numOfNextRawCalls++; 3419 3420 if (!values.isEmpty()) { 3421 if (limitOfRows > 0) { 3422 // First we need to check if the last result is partial and we have a row change. If 3423 // so then we need to increase the numOfCompleteRows. 3424 if (results.isEmpty()) { 3425 if ( 3426 rsh.rowOfLastPartialResult != null 3427 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3428 ) { 3429 numOfCompleteRows++; 3430 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3431 builder); 3432 } 3433 } else { 3434 Result lastResult = results.get(results.size() - 1); 3435 if ( 3436 lastResult.mayHaveMoreCellsInRow() 3437 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3438 ) { 3439 numOfCompleteRows++; 3440 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3441 builder); 3442 } 3443 } 3444 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3445 break; 3446 } 3447 } 3448 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3449 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3450 3451 if (request.getScan().getQueryMetricsEnabled()) { 3452 builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder() 3453 .setBlockBytesScanned(blockBytesScanned).build()); 3454 } 3455 3456 results.add(r); 3457 numOfResults++; 3458 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3459 numOfCompleteRows++; 3460 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3461 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3462 break; 3463 } 3464 } 3465 } else if (!moreRows && !results.isEmpty()) { 3466 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3467 // last result is false. Otherwise it's possible that: the first nextRaw returned 3468 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3469 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3470 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3471 // be a contradictory state (HBASE-21206). 3472 int lastIdx = results.size() - 1; 3473 Result r = results.get(lastIdx); 3474 if (r.mayHaveMoreCellsInRow()) { 3475 results.set(lastIdx, ClientInternalHelper.createResult( 3476 ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false)); 3477 } 3478 } 3479 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3480 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3481 boolean resultsLimitReached = numOfResults >= maxResults; 3482 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3483 3484 if (limitReached || !moreRows) { 3485 // With block size limit, we may exceed size limit without collecting any results. 3486 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3487 // or cursor if results were collected, for example for cell size or heap size limits. 3488 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3489 // We only want to mark a ScanResponse as a heartbeat message in the event that 3490 // there are more values to be read server side. If there aren't more values, 3491 // marking it as a heartbeat is wasteful because the client will need to issue 3492 // another ScanRequest only to realize that they already have all the values 3493 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3494 // Heartbeat messages occur when the time limit has been reached, or size limit has 3495 // been reached before collecting any results. This can happen for heavily filtered 3496 // scans which scan over too many blocks. 3497 builder.setHeartbeatMessage(true); 3498 if (rsh.needCursor) { 3499 Cell cursorCell = scannerContext.getLastPeekedCell(); 3500 if (cursorCell != null) { 3501 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3502 } 3503 } 3504 } 3505 break; 3506 } 3507 values.clear(); 3508 } 3509 if (rpcCall != null) { 3510 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3511 } 3512 builder.setMoreResultsInRegion(moreRows); 3513 // Check to see if the client requested that we track metrics server side. If the 3514 // client requested metrics, retrieve the metrics from the scanner context. 3515 if (trackMetrics) { 3516 // rather than increment yet another counter in StoreScanner, just set the value here 3517 // from block size progress before writing into the response 3518 scannerContext.getMetrics().countOfBlockBytesScanned 3519 .set(scannerContext.getBlockSizeProgress()); 3520 if (rpcCall != null) { 3521 scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime()); 3522 } 3523 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3524 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3525 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3526 3527 for (Entry<String, Long> entry : metrics.entrySet()) { 3528 pairBuilder.setName(entry.getKey()); 3529 pairBuilder.setValue(entry.getValue()); 3530 metricBuilder.addMetrics(pairBuilder.build()); 3531 } 3532 3533 builder.setScanMetrics(metricBuilder.build()); 3534 } 3535 } 3536 } finally { 3537 region.closeRegionOperation(); 3538 // Update serverside metrics, even on error. 3539 long end = EnvironmentEdgeManager.currentTime(); 3540 3541 long responseCellSize = 0; 3542 long blockBytesScanned = 0; 3543 if (rpcCall != null) { 3544 responseCellSize = rpcCall.getResponseCellSize(); 3545 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3546 rsh.updateBlockBytesScanned(blockBytesScanned); 3547 } 3548 region.getMetrics().updateScan(); 3549 final MetricsRegionServer metricsRegionServer = server.getMetrics(); 3550 if (metricsRegionServer != null) { 3551 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3552 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3553 } 3554 } 3555 // coprocessor postNext hook 3556 if (region.getCoprocessorHost() != null) { 3557 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3558 } 3559 } 3560 3561 /** 3562 * Scan data in a table. 3563 * @param controller the RPC controller 3564 * @param request the scan request 3565 */ 3566 @Override 3567 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3568 throws ServiceException { 3569 if (controller != null && !(controller instanceof HBaseRpcController)) { 3570 throw new UnsupportedOperationException( 3571 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3572 } 3573 if (!request.hasScannerId() && !request.hasScan()) { 3574 throw new ServiceException( 3575 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3576 } 3577 try { 3578 checkOpen(); 3579 } catch (IOException e) { 3580 if (request.hasScannerId()) { 3581 String scannerName = toScannerName(request.getScannerId()); 3582 if (LOG.isDebugEnabled()) { 3583 LOG.debug( 3584 "Server shutting down and client tried to access missing scanner " + scannerName); 3585 } 3586 final LeaseManager leaseManager = server.getLeaseManager(); 3587 if (leaseManager != null) { 3588 try { 3589 leaseManager.cancelLease(scannerName); 3590 } catch (LeaseException le) { 3591 // No problem, ignore 3592 if (LOG.isTraceEnabled()) { 3593 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3594 } 3595 } 3596 } 3597 } 3598 throw new ServiceException(e); 3599 } 3600 requestCount.increment(); 3601 rpcScanRequestCount.increment(); 3602 RegionScannerContext rsx; 3603 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3604 try { 3605 rsx = checkQuotaAndGetRegionScannerContext(request, builder); 3606 } catch (IOException e) { 3607 if (e == SCANNER_ALREADY_CLOSED) { 3608 // Now we will close scanner automatically if there are no more results for this region but 3609 // the old client will still send a close request to us. Just ignore it and return. 3610 return builder.build(); 3611 } 3612 throw new ServiceException(e); 3613 } 3614 String scannerName = rsx.scannerName; 3615 RegionScannerHolder rsh = rsx.holder; 3616 OperationQuota quota = rsx.quota; 3617 if (rsh.fullRegionScan) { 3618 rpcFullScanRequestCount.increment(); 3619 } 3620 HRegion region = rsh.r; 3621 LeaseManager.Lease lease; 3622 try { 3623 // Remove lease while its being processed in server; protects against case 3624 // where processing of request takes > lease expiration time. or null if none found. 3625 lease = server.getLeaseManager().removeLease(scannerName); 3626 } catch (LeaseException e) { 3627 throw new ServiceException(e); 3628 } 3629 if (request.hasRenew() && request.getRenew()) { 3630 // add back and return 3631 addScannerLeaseBack(lease); 3632 try { 3633 checkScanNextCallSeq(request, rsh); 3634 } catch (OutOfOrderScannerNextException e) { 3635 throw new ServiceException(e); 3636 } 3637 return builder.build(); 3638 } 3639 try { 3640 checkScanNextCallSeq(request, rsh); 3641 } catch (OutOfOrderScannerNextException e) { 3642 addScannerLeaseBack(lease); 3643 throw new ServiceException(e); 3644 } 3645 // Now we have increased the next call sequence. If we give client an error, the retry will 3646 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3647 // and then client will try to open a new scanner. 3648 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3649 int rows; // this is scan.getCaching 3650 if (request.hasNumberOfRows()) { 3651 rows = request.getNumberOfRows(); 3652 } else { 3653 rows = closeScanner ? 0 : 1; 3654 } 3655 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3656 // now let's do the real scan. 3657 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3658 RegionScanner scanner = rsh.s; 3659 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3660 // close the scanner. 3661 int limitOfRows; 3662 if (request.hasLimitOfRows()) { 3663 limitOfRows = request.getLimitOfRows(); 3664 } else { 3665 limitOfRows = -1; 3666 } 3667 boolean scannerClosed = false; 3668 try { 3669 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3670 if (rows > 0) { 3671 boolean done = false; 3672 // Call coprocessor. Get region info from scanner. 3673 if (region.getCoprocessorHost() != null) { 3674 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3675 if (!results.isEmpty()) { 3676 for (Result r : results) { 3677 // add cell size from CP results so we can track response size and update limits 3678 // when calling scan below if !done. We'll also have tracked block size if the CP 3679 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3680 addSize(rpcCall, r); 3681 } 3682 } 3683 if (bypass != null && bypass.booleanValue()) { 3684 done = true; 3685 } 3686 } 3687 if (!done) { 3688 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3689 results, builder, rpcCall); 3690 } else { 3691 builder.setMoreResultsInRegion(!results.isEmpty()); 3692 } 3693 } else { 3694 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3695 builder.setMoreResultsInRegion(true); 3696 } 3697 3698 quota.addScanResult(results); 3699 addResults(builder, results, (HBaseRpcController) controller, 3700 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3701 isClientCellBlockSupport(rpcCall)); 3702 if (scanner.isFilterDone() && results.isEmpty()) { 3703 // If the scanner's filter - if any - is done with the scan 3704 // only set moreResults to false if the results is empty. This is used to keep compatible 3705 // with the old scan implementation where we just ignore the returned results if moreResults 3706 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3707 builder.setMoreResults(false); 3708 } 3709 // Later we may close the scanner depending on this flag so here we need to make sure that we 3710 // have already set this flag. 3711 assert builder.hasMoreResultsInRegion(); 3712 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3713 // yet. 3714 if (!builder.hasMoreResults()) { 3715 builder.setMoreResults(true); 3716 } 3717 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3718 // Record the last cell of the last result if it is a partial result 3719 // We need this to calculate the complete rows we have returned to client as the 3720 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3721 // current row. We may filter out all the remaining cells for the current row and just 3722 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3723 // check for row change. 3724 Result lastResult = results.get(results.size() - 1); 3725 if (lastResult.mayHaveMoreCellsInRow()) { 3726 rsh.rowOfLastPartialResult = lastResult.getRow(); 3727 } else { 3728 rsh.rowOfLastPartialResult = null; 3729 } 3730 } 3731 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3732 scannerClosed = true; 3733 closeScanner(region, scanner, scannerName, rpcCall, false); 3734 } 3735 3736 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3737 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3738 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3739 } 3740 3741 return builder.build(); 3742 } catch (IOException e) { 3743 try { 3744 // scanner is closed here 3745 scannerClosed = true; 3746 // The scanner state might be left in a dirty state, so we will tell the Client to 3747 // fail this RPC and close the scanner while opening up another one from the start of 3748 // row that the client has last seen. 3749 closeScanner(region, scanner, scannerName, rpcCall, true); 3750 3751 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3752 // used in two different semantics. 3753 // (1) The first is to close the client scanner and bubble up the exception all the way 3754 // to the application. This is preferred when the exception is really un-recoverable 3755 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3756 // bucket usually. 3757 // (2) Second semantics is to close the current region scanner only, but continue the 3758 // client scanner by overriding the exception. This is usually UnknownScannerException, 3759 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3760 // application-level ClientScanner has to continue without bubbling up the exception to 3761 // the client. See ClientScanner code to see how it deals with these special exceptions. 3762 if (e instanceof DoNotRetryIOException) { 3763 throw e; 3764 } 3765 3766 // If it is a FileNotFoundException, wrap as a 3767 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3768 if (e instanceof FileNotFoundException) { 3769 throw new DoNotRetryIOException(e); 3770 } 3771 3772 // We closed the scanner already. Instead of throwing the IOException, and client 3773 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3774 // a special exception to save an RPC. 3775 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3776 // 1.4.0+ clients know how to handle 3777 throw new ScannerResetException("Scanner is closed on the server-side", e); 3778 } else { 3779 // older clients do not know about SRE. Just throw USE, which they will handle 3780 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3781 + " scanner state for clients older than 1.3.", e); 3782 } 3783 } catch (IOException ioe) { 3784 throw new ServiceException(ioe); 3785 } 3786 } finally { 3787 if (!scannerClosed) { 3788 // Adding resets expiration time on lease. 3789 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3790 if (rpcCall != null) { 3791 rpcCall.setCallBack(rsh.shippedCallback); 3792 } else { 3793 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3794 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3795 // back to regionserver's LeaseManager in rsh.shippedCallback. 3796 runShippedCallback(rsh); 3797 } 3798 } 3799 quota.close(); 3800 } 3801 } 3802 3803 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3804 assert rsh.shippedCallback != null; 3805 try { 3806 rsh.shippedCallback.run(); 3807 } catch (IOException ioe) { 3808 throw new ServiceException(ioe); 3809 } 3810 } 3811 3812 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3813 RpcCallContext context, boolean isError) throws IOException { 3814 if (region.getCoprocessorHost() != null) { 3815 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3816 // bypass the actual close. 3817 return; 3818 } 3819 } 3820 RegionScannerHolder rsh = scanners.remove(scannerName); 3821 if (rsh != null) { 3822 if (context != null) { 3823 context.setCallBack(rsh.closeCallBack); 3824 } else { 3825 rsh.s.close(); 3826 } 3827 if (region.getCoprocessorHost() != null) { 3828 region.getCoprocessorHost().postScannerClose(scanner); 3829 } 3830 if (!isError) { 3831 closedScanners.put(scannerName, rsh.getNextCallSeq()); 3832 } 3833 } 3834 } 3835 3836 @Override 3837 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3838 CoprocessorServiceRequest request) throws ServiceException { 3839 rpcPreCheck("execRegionServerService"); 3840 return server.execRegionServerService(controller, request); 3841 } 3842 3843 @Override 3844 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3845 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3846 try { 3847 final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager(); 3848 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3849 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3850 if (manager != null) { 3851 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3852 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3853 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3854 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3855 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3856 } 3857 } 3858 return builder.build(); 3859 } catch (Exception e) { 3860 throw new ServiceException(e); 3861 } 3862 } 3863 3864 @Override 3865 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3866 ClearRegionBlockCacheRequest request) throws ServiceException { 3867 try { 3868 rpcPreCheck("clearRegionBlockCache"); 3869 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3870 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3871 server.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3872 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3873 for (HRegion region : regions) { 3874 try { 3875 stats = stats.append(this.server.clearRegionBlockCache(region)); 3876 } catch (Exception e) { 3877 stats.addException(region.getRegionInfo().getRegionName(), e); 3878 } 3879 } 3880 stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3881 server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3882 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3883 } catch (IOException e) { 3884 throw new ServiceException(e); 3885 } 3886 } 3887 3888 private void executeOpenRegionProcedures(OpenRegionRequest request, 3889 Map<TableName, TableDescriptor> tdCache) { 3890 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3891 long initiatingMasterActiveTime = 3892 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3893 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3894 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3895 TableName tableName = regionInfo.getTable(); 3896 TableDescriptor tableDesc = tdCache.get(tableName); 3897 if (tableDesc == null) { 3898 try { 3899 tableDesc = server.getTableDescriptors().get(regionInfo.getTable()); 3900 } catch (IOException e) { 3901 // Here we do not fail the whole method since we also need deal with other 3902 // procedures, and we can not ignore this one, so we still schedule a 3903 // AssignRegionHandler and it will report back to master if we still can not get the 3904 // TableDescriptor. 3905 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3906 regionInfo.getTable(), e); 3907 } 3908 if (tableDesc != null) { 3909 tdCache.put(tableName, tableDesc); 3910 } 3911 } 3912 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3913 server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3914 regionOpenInfo.getFavoredNodesList()); 3915 } 3916 long procId = regionOpenInfo.getOpenProcId(); 3917 if (server.submitRegionProcedure(procId)) { 3918 server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId, 3919 tableDesc, masterSystemTime, initiatingMasterActiveTime)); 3920 } 3921 } 3922 } 3923 3924 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3925 String encodedName; 3926 long initiatingMasterActiveTime = 3927 request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1; 3928 try { 3929 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3930 } catch (DoNotRetryIOException e) { 3931 throw new UncheckedIOException("Should not happen", e); 3932 } 3933 ServerName destination = request.hasDestinationServer() 3934 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3935 : null; 3936 long procId = request.getCloseProcId(); 3937 boolean evictCache = request.getEvictCache(); 3938 if (server.submitRegionProcedure(procId)) { 3939 server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId, 3940 false, destination, evictCache, initiatingMasterActiveTime)); 3941 } 3942 } 3943 3944 private void executeProcedures(RemoteProcedureRequest request) { 3945 RSProcedureCallable callable; 3946 try { 3947 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3948 .getDeclaredConstructor().newInstance(); 3949 } catch (Exception e) { 3950 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3951 request.getProcId(), e); 3952 server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(), 3953 e); 3954 return; 3955 } 3956 callable.init(request.getProcData().toByteArray(), server); 3957 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3958 server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable); 3959 } 3960 3961 @Override 3962 @QosPriority(priority = HConstants.ADMIN_QOS) 3963 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3964 ExecuteProceduresRequest request) throws ServiceException { 3965 try { 3966 checkOpen(); 3967 throwOnWrongStartCode(request); 3968 server.getRegionServerCoprocessorHost().preExecuteProcedures(); 3969 if (request.getOpenRegionCount() > 0) { 3970 // Avoid reading from the TableDescritor every time(usually it will read from the file 3971 // system) 3972 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3973 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3974 } 3975 if (request.getCloseRegionCount() > 0) { 3976 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3977 } 3978 if (request.getProcCount() > 0) { 3979 request.getProcList().forEach(this::executeProcedures); 3980 } 3981 server.getRegionServerCoprocessorHost().postExecuteProcedures(); 3982 return ExecuteProceduresResponse.getDefaultInstance(); 3983 } catch (IOException e) { 3984 throw new ServiceException(e); 3985 } 3986 } 3987 3988 @Override 3989 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 3990 GetAllBootstrapNodesRequest request) throws ServiceException { 3991 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 3992 server.getBootstrapNodes() 3993 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 3994 return builder.build(); 3995 } 3996 3997 private void setReloadableGuardrails(Configuration conf) { 3998 rowSizeWarnThreshold = 3999 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 4000 rejectRowsWithSizeOverThreshold = 4001 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 4002 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4003 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 4004 } 4005 4006 @Override 4007 public void onConfigurationChange(Configuration conf) { 4008 super.onConfigurationChange(conf); 4009 setReloadableGuardrails(conf); 4010 } 4011 4012 @Override 4013 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 4014 GetCachedFilesListRequest request) throws ServiceException { 4015 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 4016 List<String> fullyCachedFiles = new ArrayList<>(); 4017 server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 4018 fullyCachedFiles.addAll(fcf.keySet()); 4019 }); 4020 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 4021 } 4022 4023 RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request, 4024 ScanResponse.Builder builder) throws IOException { 4025 if (request.hasScannerId()) { 4026 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 4027 // for more details. 4028 long scannerId = request.getScannerId(); 4029 builder.setScannerId(scannerId); 4030 String scannerName = toScannerName(scannerId); 4031 RegionScannerHolder rsh = getRegionScanner(request); 4032 OperationQuota quota = 4033 getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize, 4034 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 4035 return new RegionScannerContext(scannerName, rsh, quota); 4036 } 4037 4038 HRegion region = getRegion(request.getRegion()); 4039 OperationQuota quota = 4040 getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L); 4041 Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder); 4042 return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota); 4043 } 4044}