001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.lang.reflect.InvocationTargetException; 025import java.lang.reflect.Method; 026import java.net.BindException; 027import java.net.InetAddress; 028import java.net.InetSocketAddress; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.NavigableMap; 038import java.util.Optional; 039import java.util.Set; 040import java.util.TreeSet; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.concurrent.atomic.LongAdder; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.CacheEvictionStats; 051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellScannable; 054import org.apache.hadoop.hbase.CellScanner; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.DroppedSnapshotException; 058import org.apache.hadoop.hbase.HBaseIOException; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.HRegionLocation; 061import org.apache.hadoop.hbase.MultiActionResultTooLarge; 062import org.apache.hadoop.hbase.NotServingRegionException; 063import org.apache.hadoop.hbase.PrivateCellUtil; 064import org.apache.hadoop.hbase.RegionTooBusyException; 065import org.apache.hadoop.hbase.Server; 066import org.apache.hadoop.hbase.ServerName; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.UnknownScannerException; 069import org.apache.hadoop.hbase.client.Append; 070import org.apache.hadoop.hbase.client.CheckAndMutate; 071import org.apache.hadoop.hbase.client.CheckAndMutateResult; 072import org.apache.hadoop.hbase.client.ConnectionUtils; 073import org.apache.hadoop.hbase.client.Delete; 074import org.apache.hadoop.hbase.client.Durability; 075import org.apache.hadoop.hbase.client.Get; 076import org.apache.hadoop.hbase.client.Increment; 077import org.apache.hadoop.hbase.client.Mutation; 078import org.apache.hadoop.hbase.client.OperationWithAttributes; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionReplicaUtil; 082import org.apache.hadoop.hbase.client.Result; 083import org.apache.hadoop.hbase.client.Row; 084import org.apache.hadoop.hbase.client.Scan; 085import org.apache.hadoop.hbase.client.TableDescriptor; 086import org.apache.hadoop.hbase.client.VersionInfoUtil; 087import org.apache.hadoop.hbase.conf.ConfigurationObserver; 088import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 089import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 090import org.apache.hadoop.hbase.exceptions.ScannerResetException; 091import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 092import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 093import org.apache.hadoop.hbase.io.ByteBuffAllocator; 094import org.apache.hadoop.hbase.io.hfile.BlockCache; 095import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 096import org.apache.hadoop.hbase.ipc.HBaseRpcController; 097import org.apache.hadoop.hbase.ipc.PriorityFunction; 098import org.apache.hadoop.hbase.ipc.QosPriority; 099import org.apache.hadoop.hbase.ipc.RpcCall; 100import org.apache.hadoop.hbase.ipc.RpcCallContext; 101import org.apache.hadoop.hbase.ipc.RpcCallback; 102import org.apache.hadoop.hbase.ipc.RpcScheduler; 103import org.apache.hadoop.hbase.ipc.RpcServer; 104import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 105import org.apache.hadoop.hbase.ipc.RpcServerFactory; 106import org.apache.hadoop.hbase.ipc.RpcServerInterface; 107import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 108import org.apache.hadoop.hbase.ipc.ServerRpcController; 109import org.apache.hadoop.hbase.log.HBaseMarkers; 110import org.apache.hadoop.hbase.master.HMaster; 111import org.apache.hadoop.hbase.master.MasterRpcServices; 112import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 113import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 114import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 115import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 116import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 117import org.apache.hadoop.hbase.net.Address; 118import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 119import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 120import org.apache.hadoop.hbase.quotas.OperationQuota; 121import org.apache.hadoop.hbase.quotas.QuotaUtil; 122import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 123import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 124import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 125import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 126import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease; 127import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException; 128import org.apache.hadoop.hbase.regionserver.Region.Operation; 129import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 130import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 131import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler; 132import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 133import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 134import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 135import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; 136import org.apache.hadoop.hbase.security.Superusers; 137import org.apache.hadoop.hbase.security.User; 138import org.apache.hadoop.hbase.security.access.AccessChecker; 139import org.apache.hadoop.hbase.security.access.NoopAccessChecker; 140import org.apache.hadoop.hbase.security.access.Permission; 141import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 142import org.apache.hadoop.hbase.util.Bytes; 143import org.apache.hadoop.hbase.util.DNS; 144import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 145import org.apache.hadoop.hbase.util.Pair; 146import org.apache.hadoop.hbase.util.ReservoirSample; 147import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 148import org.apache.hadoop.hbase.wal.WAL; 149import org.apache.hadoop.hbase.wal.WALEdit; 150import org.apache.hadoop.hbase.wal.WALKey; 151import org.apache.hadoop.hbase.wal.WALSplitUtil; 152import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 153import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 154import org.apache.yetus.audience.InterfaceAudience; 155import org.apache.zookeeper.KeeperException; 156import org.slf4j.Logger; 157import org.slf4j.LoggerFactory; 158 159import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 160import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 161import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 162import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 163import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 164import org.apache.hbase.thirdparty.com.google.protobuf.Message; 165import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 166import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 167import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 168import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 169import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 170 171import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 172import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 173import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 266import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; 267import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; 268import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; 269import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; 270import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; 271import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; 272import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest; 273import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse; 274import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry; 275import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; 276import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; 277import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 278import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 279import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 280import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 281import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 282 283/** 284 * Implements the regionserver RPC services. 285 */ 286@InterfaceAudience.Private 287@SuppressWarnings("deprecation") 288public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface, 289 ClientService.BlockingInterface, ClientMetaService.BlockingInterface, 290 BootstrapNodeService.BlockingInterface, PriorityFunction, ConfigurationObserver { 291 protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 292 293 /** RPC scheduler to use for the region server. */ 294 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 295 "hbase.region.server.rpc.scheduler.factory.class"; 296 297 /** RPC scheduler to use for the master. */ 298 public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = 299 "hbase.master.rpc.scheduler.factory.class"; 300 301 /** 302 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 303 * configuration exists to prevent the scenario where a time limit is specified to be so 304 * restrictive that the time limit is reached immediately (before any cells are scanned). 305 */ 306 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 307 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 308 /** 309 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 310 */ 311 static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 312 313 /** 314 * Whether to reject rows with size > threshold defined by 315 * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME} 316 */ 317 private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD = 318 "hbase.rpc.rows.size.threshold.reject"; 319 320 /** 321 * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD} 322 */ 323 private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false; 324 325 public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit"; 326 327 public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10; 328 329 // Request counter. (Includes requests that are not serviced by regions.) 330 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 331 final LongAdder requestCount = new LongAdder(); 332 333 // Request counter for rpc get 334 final LongAdder rpcGetRequestCount = new LongAdder(); 335 336 // Request counter for rpc scan 337 final LongAdder rpcScanRequestCount = new LongAdder(); 338 339 // Request counter for scans that might end up in full scans 340 final LongAdder rpcFullScanRequestCount = new LongAdder(); 341 342 // Request counter for rpc multi 343 final LongAdder rpcMultiRequestCount = new LongAdder(); 344 345 // Request counter for rpc mutate 346 final LongAdder rpcMutateRequestCount = new LongAdder(); 347 348 // Server to handle client requests. 349 final RpcServerInterface rpcServer; 350 final InetSocketAddress isa; 351 352 protected final HRegionServer regionServer; 353 private volatile long maxScannerResultSize; 354 355 // The reference to the priority extraction function 356 private final PriorityFunction priority; 357 358 private ScannerIdGenerator scannerIdGenerator; 359 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 360 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 361 // which may send next or close request to a region scanner which has already been exhausted. The 362 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 363 private final Cache<String, String> closedScanners; 364 /** 365 * The lease timeout period for client scanners (milliseconds). 366 */ 367 private final int scannerLeaseTimeoutPeriod; 368 369 /** 370 * The RPC timeout period (milliseconds) 371 */ 372 private final int rpcTimeout; 373 374 /** 375 * The minimum allowable delta to use for the scan limit 376 */ 377 private final long minimumScanTimeLimitDelta; 378 379 /** 380 * Row size threshold for multi requests above which a warning is logged 381 */ 382 private volatile int rowSizeWarnThreshold; 383 /* 384 * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by 385 * rowSizeWarnThreshold 386 */ 387 private volatile boolean rejectRowsWithSizeOverThreshold; 388 389 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 390 391 private AccessChecker accessChecker; 392 private ZKPermissionWatcher zkPermissionWatcher; 393 394 /** 395 * Services launched in RSRpcServices. By default they are on but you can use the below booleans 396 * to selectively enable/disable these services (Rare is the case where you would ever turn off 397 * one or the other). 398 */ 399 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 400 "hbase.regionserver.admin.executorService"; 401 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 402 "hbase.regionserver.client.executorService"; 403 public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = 404 "hbase.regionserver.client.meta.executorService"; 405 public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = 406 "hbase.regionserver.bootstrap.nodes.executorService"; 407 408 /** 409 * An Rpc callback for closing a RegionScanner. 410 */ 411 private static final class RegionScannerCloseCallBack implements RpcCallback { 412 413 private final RegionScanner scanner; 414 415 public RegionScannerCloseCallBack(RegionScanner scanner) { 416 this.scanner = scanner; 417 } 418 419 @Override 420 public void run() throws IOException { 421 this.scanner.close(); 422 } 423 } 424 425 /** 426 * An Rpc callback for doing shipped() call on a RegionScanner. 427 */ 428 private class RegionScannerShippedCallBack implements RpcCallback { 429 private final String scannerName; 430 private final Shipper shipper; 431 private final Lease lease; 432 433 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 434 this.scannerName = scannerName; 435 this.shipper = shipper; 436 this.lease = lease; 437 } 438 439 @Override 440 public void run() throws IOException { 441 this.shipper.shipped(); 442 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 443 // Rpc call and we are at end of the call now. Time to add it back. 444 if (scanners.containsKey(scannerName)) { 445 if (lease != null) { 446 regionServer.getLeaseManager().addLease(lease); 447 } 448 } 449 } 450 } 451 452 /** 453 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 454 * completion of multiGets. 455 */ 456 static class RegionScannersCloseCallBack implements RpcCallback { 457 private final List<RegionScanner> scanners = new ArrayList<>(); 458 459 public void addScanner(RegionScanner scanner) { 460 this.scanners.add(scanner); 461 } 462 463 @Override 464 public void run() { 465 for (RegionScanner scanner : scanners) { 466 try { 467 scanner.close(); 468 } catch (IOException e) { 469 LOG.error("Exception while closing the scanner " + scanner, e); 470 } 471 } 472 } 473 } 474 475 /** 476 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 477 */ 478 static final class RegionScannerHolder { 479 private final AtomicLong nextCallSeq = new AtomicLong(0); 480 private final RegionScanner s; 481 private final HRegion r; 482 private final RpcCallback closeCallBack; 483 private final RpcCallback shippedCallback; 484 private byte[] rowOfLastPartialResult; 485 private boolean needCursor; 486 private boolean fullRegionScan; 487 private final String clientIPAndPort; 488 private final String userName; 489 private volatile long maxBlockBytesScanned = 0; 490 private volatile long prevBlockBytesScanned = 0; 491 private volatile long prevBlockBytesScannedDifference = 0; 492 493 RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, 494 RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, 495 String clientIPAndPort, String userName) { 496 this.s = s; 497 this.r = r; 498 this.closeCallBack = closeCallBack; 499 this.shippedCallback = shippedCallback; 500 this.needCursor = needCursor; 501 this.fullRegionScan = fullRegionScan; 502 this.clientIPAndPort = clientIPAndPort; 503 this.userName = userName; 504 } 505 506 long getNextCallSeq() { 507 return nextCallSeq.get(); 508 } 509 510 boolean incNextCallSeq(long currentSeq) { 511 // Use CAS to prevent multiple scan request running on the same scanner. 512 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 513 } 514 515 long getMaxBlockBytesScanned() { 516 return maxBlockBytesScanned; 517 } 518 519 long getPrevBlockBytesScannedDifference() { 520 return prevBlockBytesScannedDifference; 521 } 522 523 void updateBlockBytesScanned(long blockBytesScanned) { 524 prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; 525 prevBlockBytesScanned = blockBytesScanned; 526 if (blockBytesScanned > maxBlockBytesScanned) { 527 maxBlockBytesScanned = blockBytesScanned; 528 } 529 } 530 531 // Should be called only when we need to print lease expired messages otherwise 532 // cache the String once made. 533 @Override 534 public String toString() { 535 return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName 536 + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString(); 537 } 538 } 539 540 /** 541 * Instantiated as a scanner lease. If the lease times out, the scanner is closed 542 */ 543 private class ScannerListener implements LeaseListener { 544 private final String scannerName; 545 546 ScannerListener(final String n) { 547 this.scannerName = n; 548 } 549 550 @Override 551 public void leaseExpired() { 552 RegionScannerHolder rsh = scanners.remove(this.scannerName); 553 if (rsh == null) { 554 LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName); 555 return; 556 } 557 LOG.info("Scanner lease {} expired {}", this.scannerName, rsh); 558 regionServer.getMetrics().incrScannerLeaseExpired(); 559 RegionScanner s = rsh.s; 560 HRegion region = null; 561 try { 562 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 563 if (region != null && region.getCoprocessorHost() != null) { 564 region.getCoprocessorHost().preScannerClose(s); 565 } 566 } catch (IOException e) { 567 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 568 } finally { 569 try { 570 s.close(); 571 if (region != null && region.getCoprocessorHost() != null) { 572 region.getCoprocessorHost().postScannerClose(s); 573 } 574 } catch (IOException e) { 575 LOG.error("Closing scanner {} {}", this.scannerName, rsh, e); 576 } 577 } 578 } 579 } 580 581 private static ResultOrException getResultOrException(final ClientProtos.Result r, 582 final int index) { 583 return getResultOrException(ResponseConverter.buildActionResult(r), index); 584 } 585 586 private static ResultOrException getResultOrException(final Exception e, final int index) { 587 return getResultOrException(ResponseConverter.buildActionResult(e), index); 588 } 589 590 private static ResultOrException getResultOrException(final ResultOrException.Builder builder, 591 final int index) { 592 return builder.setIndex(index).build(); 593 } 594 595 /** 596 * Checks for the following pre-checks in order: 597 * <ol> 598 * <li>RegionServer is running</li> 599 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 600 * </ol> 601 * @param requestName name of rpc request. Used in reporting failures to provide context. 602 * @throws ServiceException If any of the above listed pre-check fails. 603 */ 604 private void rpcPreCheck(String requestName) throws ServiceException { 605 try { 606 checkOpen(); 607 requirePermission(requestName, Permission.Action.ADMIN); 608 } catch (IOException ioe) { 609 throw new ServiceException(ioe); 610 } 611 } 612 613 private boolean isClientCellBlockSupport(RpcCallContext context) { 614 return context != null && context.isClientCellBlockSupported(); 615 } 616 617 private void addResult(final MutateResponse.Builder builder, final Result result, 618 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 619 if (result == null) return; 620 if (clientCellBlockSupported) { 621 builder.setResult(ProtobufUtil.toResultNoData(result)); 622 rpcc.setCellScanner(result.cellScanner()); 623 } else { 624 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 625 builder.setResult(pbr); 626 } 627 } 628 629 private void addResults(ScanResponse.Builder builder, List<Result> results, 630 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 631 builder.setStale(!isDefaultRegion); 632 if (results.isEmpty()) { 633 return; 634 } 635 if (clientCellBlockSupported) { 636 for (Result res : results) { 637 builder.addCellsPerResult(res.size()); 638 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 639 } 640 controller.setCellScanner(CellUtil.createCellScanner(results)); 641 } else { 642 for (Result res : results) { 643 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 644 builder.addResults(pbr); 645 } 646 } 647 } 648 649 private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions, 650 CellScanner cellScanner, Condition condition, long nonceGroup, 651 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 652 int countOfCompleteMutation = 0; 653 try { 654 if (!region.getRegionInfo().isMetaRegion()) { 655 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 656 } 657 List<Mutation> mutations = new ArrayList<>(); 658 long nonce = HConstants.NO_NONCE; 659 for (ClientProtos.Action action : actions) { 660 if (action.hasGet()) { 661 throw new DoNotRetryIOException( 662 "Atomic put and/or delete only, not a Get=" + action.getGet()); 663 } 664 MutationProto mutation = action.getMutation(); 665 MutationType type = mutation.getMutateType(); 666 switch (type) { 667 case PUT: 668 Put put = ProtobufUtil.toPut(mutation, cellScanner); 669 ++countOfCompleteMutation; 670 checkCellSizeLimit(region, put); 671 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 672 mutations.add(put); 673 break; 674 case DELETE: 675 Delete del = ProtobufUtil.toDelete(mutation, cellScanner); 676 ++countOfCompleteMutation; 677 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 678 mutations.add(del); 679 break; 680 case INCREMENT: 681 Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner); 682 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 683 ++countOfCompleteMutation; 684 checkCellSizeLimit(region, increment); 685 spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment); 686 mutations.add(increment); 687 break; 688 case APPEND: 689 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 690 nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 691 ++countOfCompleteMutation; 692 checkCellSizeLimit(region, append); 693 spaceQuotaEnforcement.getPolicyEnforcement(region).check(append); 694 mutations.add(append); 695 break; 696 default: 697 throw new DoNotRetryIOException("invalid mutation type : " + type); 698 } 699 } 700 701 if (mutations.size() == 0) { 702 return new CheckAndMutateResult(true, null); 703 } else { 704 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations); 705 CheckAndMutateResult result = null; 706 if (region.getCoprocessorHost() != null) { 707 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 708 } 709 if (result == null) { 710 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 711 if (region.getCoprocessorHost() != null) { 712 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 713 } 714 } 715 return result; 716 } 717 } finally { 718 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 719 // even if the malformed cells are not skipped. 720 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 721 skipCellsForMutation(actions.get(i), cellScanner); 722 } 723 } 724 } 725 726 /** 727 * Execute an append mutation. 728 * @return result to return to client if default operation should be bypassed as indicated by 729 * RegionObserver, null otherwise 730 */ 731 private Result append(final HRegion region, final OperationQuota quota, 732 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 733 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 734 long before = EnvironmentEdgeManager.currentTime(); 735 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 736 checkCellSizeLimit(region, append); 737 spaceQuota.getPolicyEnforcement(region).check(append); 738 quota.addMutation(append); 739 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 740 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 741 Result r = region.append(append, nonceGroup, nonce); 742 if (regionServer.getMetrics() != null) { 743 long blockBytesScanned = 744 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 745 regionServer.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before, 746 blockBytesScanned); 747 } 748 return r == null ? Result.EMPTY_RESULT : r; 749 } 750 751 /** 752 * Execute an increment mutation. 753 */ 754 private Result increment(final HRegion region, final OperationQuota quota, 755 final MutationProto mutation, final CellScanner cells, long nonceGroup, 756 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 757 long before = EnvironmentEdgeManager.currentTime(); 758 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 759 checkCellSizeLimit(region, increment); 760 spaceQuota.getPolicyEnforcement(region).check(increment); 761 quota.addMutation(increment); 762 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 763 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 764 Result r = region.increment(increment, nonceGroup, nonce); 765 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 766 if (metricsRegionServer != null) { 767 long blockBytesScanned = 768 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 769 metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before, 770 blockBytesScanned); 771 } 772 return r == null ? Result.EMPTY_RESULT : r; 773 } 774 775 /** 776 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 777 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 778 * @param cellsToReturn Could be null. May be allocated in this method. This is what this method 779 * returns as a 'result'. 780 * @param closeCallBack the callback to be used with multigets 781 * @param context the current RpcCallContext 782 * @return Return the <code>cellScanner</code> passed 783 */ 784 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 785 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 786 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 787 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 788 ActivePolicyEnforcement spaceQuotaEnforcement) { 789 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 790 // one at a time, we instead pass them in batch. Be aware that the corresponding 791 // ResultOrException instance that matches each Put or Delete is then added down in the 792 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 793 // deferred/batched 794 List<ClientProtos.Action> mutations = null; 795 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 796 IOException sizeIOE = null; 797 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = 798 ResultOrException.newBuilder(); 799 boolean hasResultOrException = false; 800 for (ClientProtos.Action action : actions.getActionList()) { 801 hasResultOrException = false; 802 resultOrExceptionBuilder.clear(); 803 try { 804 Result r = null; 805 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 806 if ( 807 context != null && context.isRetryImmediatelySupported() 808 && (context.getResponseCellSize() > maxQuotaResultSize 809 || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) 810 ) { 811 812 // We're storing the exception since the exception and reason string won't 813 // change after the response size limit is reached. 814 if (sizeIOE == null) { 815 // We don't need the stack un-winding do don't throw the exception. 816 // Throwing will kill the JVM's JIT. 817 // 818 // Instead just create the exception and then store it. 819 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " 820 + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); 821 822 // Only report the exception once since there's only one request that 823 // caused the exception. Otherwise this number will dominate the exceptions count. 824 rpcServer.getMetrics().exception(sizeIOE); 825 } 826 827 // Now that there's an exception is known to be created 828 // use it for the response. 829 // 830 // This will create a copy in the builder. 831 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 832 resultOrExceptionBuilder.setException(pair); 833 context.incrementResponseExceptionSize(pair.getSerializedSize()); 834 resultOrExceptionBuilder.setIndex(action.getIndex()); 835 builder.addResultOrException(resultOrExceptionBuilder.build()); 836 skipCellsForMutation(action, cellScanner); 837 continue; 838 } 839 if (action.hasGet()) { 840 long before = EnvironmentEdgeManager.currentTime(); 841 ClientProtos.Get pbGet = action.getGet(); 842 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 843 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 844 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 845 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 846 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 847 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 848 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 849 + "reverse Scan."); 850 } 851 try { 852 Get get = ProtobufUtil.toGet(pbGet); 853 if (context != null) { 854 r = get(get, (region), closeCallBack, context); 855 } else { 856 r = region.get(get); 857 } 858 } finally { 859 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 860 if (metricsRegionServer != null) { 861 long blockBytesScanned = 862 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 863 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 864 blockBytesScanned); 865 } 866 } 867 } else if (action.hasServiceCall()) { 868 hasResultOrException = true; 869 com.google.protobuf.Message result = execServiceOnRegion(region, action.getServiceCall()); 870 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 871 ClientProtos.CoprocessorServiceResult.newBuilder(); 872 resultOrExceptionBuilder.setServiceResult(serviceResultBuilder 873 .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName()) 874 // TODO: Copy!!! 875 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 876 } else if (action.hasMutation()) { 877 MutationType type = action.getMutation().getMutateType(); 878 if ( 879 type != MutationType.PUT && type != MutationType.DELETE && mutations != null 880 && !mutations.isEmpty() 881 ) { 882 // Flush out any Puts or Deletes already collected. 883 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 884 spaceQuotaEnforcement); 885 mutations.clear(); 886 } 887 switch (type) { 888 case APPEND: 889 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 890 spaceQuotaEnforcement, context); 891 break; 892 case INCREMENT: 893 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 894 spaceQuotaEnforcement, context); 895 break; 896 case PUT: 897 case DELETE: 898 // Collect the individual mutations and apply in a batch 899 if (mutations == null) { 900 mutations = new ArrayList<>(actions.getActionCount()); 901 } 902 mutations.add(action); 903 break; 904 default: 905 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 906 } 907 } else { 908 throw new HBaseIOException("Unexpected Action type"); 909 } 910 if (r != null) { 911 ClientProtos.Result pbResult = null; 912 if (isClientCellBlockSupport(context)) { 913 pbResult = ProtobufUtil.toResultNoData(r); 914 // Hard to guess the size here. Just make a rough guess. 915 if (cellsToReturn == null) { 916 cellsToReturn = new ArrayList<>(); 917 } 918 cellsToReturn.add(r); 919 } else { 920 pbResult = ProtobufUtil.toResult(r); 921 } 922 addSize(context, r); 923 hasResultOrException = true; 924 resultOrExceptionBuilder.setResult(pbResult); 925 } 926 // Could get to here and there was no result and no exception. Presumes we added 927 // a Put or Delete to the collecting Mutations List for adding later. In this 928 // case the corresponding ResultOrException instance for the Put or Delete will be added 929 // down in the doNonAtomicBatchOp method call rather than up here. 930 } catch (IOException ie) { 931 rpcServer.getMetrics().exception(ie); 932 hasResultOrException = true; 933 NameBytesPair pair = ResponseConverter.buildException(ie); 934 resultOrExceptionBuilder.setException(pair); 935 context.incrementResponseExceptionSize(pair.getSerializedSize()); 936 } 937 if (hasResultOrException) { 938 // Propagate index. 939 resultOrExceptionBuilder.setIndex(action.getIndex()); 940 builder.addResultOrException(resultOrExceptionBuilder.build()); 941 } 942 } 943 // Finish up any outstanding mutations 944 if (!CollectionUtils.isEmpty(mutations)) { 945 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 946 } 947 return cellsToReturn; 948 } 949 950 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 951 if (r.maxCellSize > 0) { 952 CellScanner cells = m.cellScanner(); 953 while (cells.advance()) { 954 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 955 if (size > r.maxCellSize) { 956 String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of " 957 + r.maxCellSize + " bytes"; 958 LOG.debug(msg); 959 throw new DoNotRetryIOException(msg); 960 } 961 } 962 } 963 } 964 965 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 966 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 967 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 968 // Just throw the exception. The exception will be caught and then added to region-level 969 // exception for RegionAction. Leaving the null to action result is ok since the null 970 // result is viewed as failure by hbase client. And the region-lever exception will be used 971 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 972 // AsyncBatchRpcRetryingCaller#onComplete for more details. 973 doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true); 974 } 975 976 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 977 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 978 ActivePolicyEnforcement spaceQuotaEnforcement) { 979 try { 980 doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE, 981 spaceQuotaEnforcement, false); 982 } catch (IOException e) { 983 // Set the exception for each action. The mutations in same RegionAction are group to 984 // different batch and then be processed individually. Hence, we don't set the region-level 985 // exception here for whole RegionAction. 986 for (Action mutation : mutations) { 987 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 988 } 989 } 990 } 991 992 /** 993 * Execute a list of mutations. 994 */ 995 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 996 final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, 997 long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 998 throws IOException { 999 Mutation[] mArray = new Mutation[mutations.size()]; 1000 long before = EnvironmentEdgeManager.currentTime(); 1001 boolean batchContainsPuts = false, batchContainsDelete = false; 1002 try { 1003 /** 1004 * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions 1005 * since mutation array may have been reoredered.In order to return the right result or 1006 * exception to the corresponding actions, We need to know which action is the mutation belong 1007 * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners. 1008 */ 1009 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 1010 int i = 0; 1011 long nonce = HConstants.NO_NONCE; 1012 for (ClientProtos.Action action : mutations) { 1013 if (action.hasGet()) { 1014 throw new DoNotRetryIOException( 1015 "Atomic put and/or delete only, not a Get=" + action.getGet()); 1016 } 1017 MutationProto m = action.getMutation(); 1018 Mutation mutation; 1019 switch (m.getMutateType()) { 1020 case PUT: 1021 mutation = ProtobufUtil.toPut(m, cells); 1022 batchContainsPuts = true; 1023 break; 1024 1025 case DELETE: 1026 mutation = ProtobufUtil.toDelete(m, cells); 1027 batchContainsDelete = true; 1028 break; 1029 1030 case INCREMENT: 1031 mutation = ProtobufUtil.toIncrement(m, cells); 1032 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 1033 break; 1034 1035 case APPEND: 1036 mutation = ProtobufUtil.toAppend(m, cells); 1037 nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE; 1038 break; 1039 1040 default: 1041 throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType()); 1042 } 1043 mutationActionMap.put(mutation, action); 1044 mArray[i++] = mutation; 1045 checkCellSizeLimit(region, mutation); 1046 // Check if a space quota disallows this mutation 1047 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1048 quota.addMutation(mutation); 1049 } 1050 1051 if (!region.getRegionInfo().isMetaRegion()) { 1052 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 1053 } 1054 1055 // HBASE-17924 1056 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1057 // order is preserved as its expected from the client 1058 if (!atomic) { 1059 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1060 } 1061 1062 OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); 1063 1064 // When atomic is true, it indicates that the mutateRow API or the batch API with 1065 // RowMutations is called. In this case, we need to merge the results of the 1066 // Increment/Append operations if the mutations include those operations, and set the merged 1067 // result to the first element of the ResultOrException list 1068 if (atomic) { 1069 List<ResultOrException> resultOrExceptions = new ArrayList<>(); 1070 List<Result> results = new ArrayList<>(); 1071 for (i = 0; i < codes.length; i++) { 1072 if (codes[i].getResult() != null) { 1073 results.add(codes[i].getResult()); 1074 } 1075 if (i != 0) { 1076 resultOrExceptions 1077 .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i)); 1078 } 1079 } 1080 1081 if (results.isEmpty()) { 1082 builder.addResultOrException( 1083 getResultOrException(ClientProtos.Result.getDefaultInstance(), 0)); 1084 } else { 1085 // Merge the results of the Increment/Append operations 1086 List<Cell> cellList = new ArrayList<>(); 1087 for (Result result : results) { 1088 if (result.rawCells() != null) { 1089 cellList.addAll(Arrays.asList(result.rawCells())); 1090 } 1091 } 1092 Result result = Result.create(cellList); 1093 1094 // Set the merged result of the Increment/Append operations to the first element of the 1095 // ResultOrException list 1096 builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0)); 1097 } 1098 1099 builder.addAllResultOrException(resultOrExceptions); 1100 return; 1101 } 1102 1103 for (i = 0; i < codes.length; i++) { 1104 Mutation currentMutation = mArray[i]; 1105 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1106 int index = currentAction.hasIndex() ? currentAction.getIndex() : i; 1107 Exception e; 1108 switch (codes[i].getOperationStatusCode()) { 1109 case BAD_FAMILY: 1110 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1111 builder.addResultOrException(getResultOrException(e, index)); 1112 break; 1113 1114 case SANITY_CHECK_FAILURE: 1115 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1116 builder.addResultOrException(getResultOrException(e, index)); 1117 break; 1118 1119 default: 1120 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1121 builder.addResultOrException(getResultOrException(e, index)); 1122 break; 1123 1124 case SUCCESS: 1125 ClientProtos.Result result = codes[i].getResult() == null 1126 ? ClientProtos.Result.getDefaultInstance() 1127 : ProtobufUtil.toResult(codes[i].getResult()); 1128 builder.addResultOrException(getResultOrException(result, index)); 1129 break; 1130 1131 case STORE_TOO_BUSY: 1132 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1133 builder.addResultOrException(getResultOrException(e, index)); 1134 break; 1135 } 1136 } 1137 } finally { 1138 int processedMutationIndex = 0; 1139 for (Action mutation : mutations) { 1140 // The non-null mArray[i] means the cell scanner has been read. 1141 if (mArray[processedMutationIndex++] == null) { 1142 skipCellsForMutation(mutation, cells); 1143 } 1144 } 1145 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1146 } 1147 } 1148 1149 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1150 boolean batchContainsDelete) { 1151 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 1152 if (metricsRegionServer != null) { 1153 long after = EnvironmentEdgeManager.currentTime(); 1154 if (batchContainsPuts) { 1155 metricsRegionServer.updatePutBatch(region, after - starttime); 1156 } 1157 if (batchContainsDelete) { 1158 metricsRegionServer.updateDeleteBatch(region, after - starttime); 1159 } 1160 } 1161 } 1162 1163 /** 1164 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1165 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1166 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1167 * exceptionMessage if any 1168 */ 1169 private OperationStatus[] doReplayBatchOp(final HRegion region, 1170 final List<MutationReplay> mutations, long replaySeqId) throws IOException { 1171 long before = EnvironmentEdgeManager.currentTime(); 1172 boolean batchContainsPuts = false, batchContainsDelete = false; 1173 try { 1174 for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) { 1175 MutationReplay m = it.next(); 1176 1177 if (m.getType() == MutationType.PUT) { 1178 batchContainsPuts = true; 1179 } else { 1180 batchContainsDelete = true; 1181 } 1182 1183 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1184 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1185 if (metaCells != null && !metaCells.isEmpty()) { 1186 for (Cell metaCell : metaCells) { 1187 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1188 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1189 HRegion hRegion = region; 1190 if (compactionDesc != null) { 1191 // replay the compaction. Remove the files from stores only if we are the primary 1192 // region replica (thus own the files) 1193 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1194 replaySeqId); 1195 continue; 1196 } 1197 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1198 if (flushDesc != null && !isDefaultReplica) { 1199 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1200 continue; 1201 } 1202 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1203 if (regionEvent != null && !isDefaultReplica) { 1204 hRegion.replayWALRegionEventMarker(regionEvent); 1205 continue; 1206 } 1207 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1208 if (bulkLoadEvent != null) { 1209 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1210 continue; 1211 } 1212 } 1213 it.remove(); 1214 } 1215 } 1216 requestCount.increment(); 1217 if (!region.getRegionInfo().isMetaRegion()) { 1218 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 1219 } 1220 return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]), 1221 replaySeqId); 1222 } finally { 1223 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1224 } 1225 } 1226 1227 private void closeAllScanners() { 1228 // Close any outstanding scanners. Means they'll get an UnknownScanner 1229 // exception next time they come in. 1230 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1231 try { 1232 e.getValue().s.close(); 1233 } catch (IOException ioe) { 1234 LOG.warn("Closing scanner " + e.getKey(), ioe); 1235 } 1236 } 1237 } 1238 1239 // Directly invoked only for testing 1240 public RSRpcServices(final HRegionServer rs) throws IOException { 1241 final Configuration conf = rs.getConfiguration(); 1242 regionServer = rs; 1243 final RpcSchedulerFactory rpcSchedulerFactory; 1244 try { 1245 rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) 1246 .getDeclaredConstructor().newInstance(); 1247 } catch (NoSuchMethodException | InvocationTargetException | InstantiationException 1248 | IllegalAccessException e) { 1249 throw new IllegalArgumentException(e); 1250 } 1251 // Server to handle client requests. 1252 final InetSocketAddress initialIsa; 1253 final InetSocketAddress bindAddress; 1254 if (this instanceof MasterRpcServices) { 1255 String hostname = DNS.getHostname(conf, DNS.ServerType.MASTER); 1256 int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 1257 // Creation of a HSA will force a resolve. 1258 initialIsa = new InetSocketAddress(hostname, port); 1259 bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port); 1260 } else { 1261 String hostname = DNS.getHostname(conf, DNS.ServerType.REGIONSERVER); 1262 int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); 1263 // Creation of a HSA will force a resolve. 1264 initialIsa = new InetSocketAddress(hostname, port); 1265 bindAddress = 1266 new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port); 1267 } 1268 if (initialIsa.getAddress() == null) { 1269 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 1270 } 1271 priority = createPriority(); 1272 // Using Address means we don't get the IP too. Shorten it more even to just the host name 1273 // w/o the domain. 1274 final String name = rs.getProcessName() + "/" 1275 + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 1276 // Set how many times to retry talking to another server over Connection. 1277 ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); 1278 rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); 1279 rpcServer.setRsRpcServices(this); 1280 if (!(rs instanceof HMaster)) { 1281 rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder()); 1282 } 1283 setReloadableGuardrails(conf); 1284 scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1285 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1286 rpcTimeout = 1287 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1288 minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1289 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1290 1291 final InetSocketAddress address = rpcServer.getListenerAddress(); 1292 if (address == null) { 1293 throw new IOException("Listener channel is closed"); 1294 } 1295 // Set our address, however we need the final port that was given to rpcServer 1296 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 1297 rpcServer.setErrorHandler(this); 1298 rs.setName(name); 1299 1300 closedScanners = CacheBuilder.newBuilder() 1301 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1302 } 1303 1304 protected RpcServerInterface createRpcServer(final Server server, 1305 final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress, 1306 final String name) throws IOException { 1307 final Configuration conf = server.getConfiguration(); 1308 boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 1309 try { 1310 return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final 1311 // bindAddress 1312 // for this 1313 // server. 1314 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1315 } catch (BindException be) { 1316 throw new IOException(be.getMessage() + ". To switch ports use the '" 1317 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1318 be.getCause() != null ? be.getCause() : be); 1319 } 1320 } 1321 1322 protected Class<?> getRpcSchedulerFactoryClass() { 1323 final Configuration conf = regionServer.getConfiguration(); 1324 return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1325 SimpleRpcSchedulerFactory.class); 1326 } 1327 1328 @Override 1329 public void onConfigurationChange(Configuration newConf) { 1330 if (rpcServer instanceof ConfigurationObserver) { 1331 ((ConfigurationObserver) rpcServer).onConfigurationChange(newConf); 1332 setReloadableGuardrails(newConf); 1333 } 1334 } 1335 1336 protected PriorityFunction createPriority() { 1337 return new AnnotationReadingPriorityFunction(this); 1338 } 1339 1340 protected void requirePermission(String request, Permission.Action perm) throws IOException { 1341 if (accessChecker != null) { 1342 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm); 1343 } 1344 } 1345 1346 public int getScannersCount() { 1347 return scanners.size(); 1348 } 1349 1350 /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */ 1351 RegionScanner getScanner(long scannerId) { 1352 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1353 return rsh == null ? null : rsh.s; 1354 } 1355 1356 /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */ 1357 private RegionScannerHolder getRegionScannerHolder(long scannerId) { 1358 return scanners.get(toScannerName(scannerId)); 1359 } 1360 1361 public String getScanDetailsWithId(long scannerId) { 1362 RegionScanner scanner = getScanner(scannerId); 1363 if (scanner == null) { 1364 return null; 1365 } 1366 StringBuilder builder = new StringBuilder(); 1367 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1368 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1369 builder.append(" operation_id: ").append(scanner.getOperationId()); 1370 return builder.toString(); 1371 } 1372 1373 public String getScanDetailsWithRequest(ScanRequest request) { 1374 try { 1375 if (!request.hasRegion()) { 1376 return null; 1377 } 1378 Region region = getRegion(request.getRegion()); 1379 StringBuilder builder = new StringBuilder(); 1380 builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString()); 1381 builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString()); 1382 for (NameBytesPair pair : request.getScan().getAttributeList()) { 1383 if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) { 1384 builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray())); 1385 break; 1386 } 1387 } 1388 return builder.toString(); 1389 } catch (IOException ignored) { 1390 return null; 1391 } 1392 } 1393 1394 /** 1395 * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls. 1396 */ 1397 long getScannerVirtualTime(long scannerId) { 1398 RegionScannerHolder rsh = getRegionScannerHolder(scannerId); 1399 return rsh == null ? 0L : rsh.getNextCallSeq(); 1400 } 1401 1402 /** 1403 * Method to account for the size of retained cells. 1404 * @param context rpc call context 1405 * @param r result to add size. 1406 * @return an object that represents the last referenced block from this response. 1407 */ 1408 void addSize(RpcCallContext context, Result r) { 1409 if (context != null && r != null && !r.isEmpty()) { 1410 for (Cell c : r.rawCells()) { 1411 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1412 } 1413 } 1414 } 1415 1416 /** Returns Remote client's ip and port else null if can't be determined. */ 1417 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1418 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1419 static String getRemoteClientIpAndPort() { 1420 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1421 if (rpcCall == null) { 1422 return HConstants.EMPTY_STRING; 1423 } 1424 InetAddress address = rpcCall.getRemoteAddress(); 1425 if (address == null) { 1426 return HConstants.EMPTY_STRING; 1427 } 1428 // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name 1429 // resolution. Just use the IP. It is generally a smaller amount of info to keep around while 1430 // scanning than a hostname anyways. 1431 return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString(); 1432 } 1433 1434 /** Returns Remote client's username. */ 1435 @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices", 1436 link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java") 1437 static String getUserName() { 1438 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 1439 if (rpcCall == null) { 1440 return HConstants.EMPTY_STRING; 1441 } 1442 return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING); 1443 } 1444 1445 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1446 HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException { 1447 Lease lease = regionServer.getLeaseManager().createLease(scannerName, 1448 this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); 1449 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1450 RpcCallback closeCallback = 1451 s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); 1452 RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, 1453 needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); 1454 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1455 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " 1456 + scannerName + ", " + existing; 1457 return rsh; 1458 } 1459 1460 private boolean isFullRegionScan(Scan scan, HRegion region) { 1461 // If the scan start row equals or less than the start key of the region 1462 // and stop row greater than equals end key (if stop row present) 1463 // or if the stop row is empty 1464 // account this as a full region scan 1465 if ( 1466 Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0 1467 && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0 1468 && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) 1469 || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) 1470 ) { 1471 return true; 1472 } 1473 return false; 1474 } 1475 1476 /** 1477 * Find the HRegion based on a region specifier 1478 * @param regionSpecifier the region specifier 1479 * @return the corresponding region 1480 * @throws IOException if the specifier is not null, but failed to find the region 1481 */ 1482 public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException { 1483 return regionServer.getRegion(regionSpecifier.getValue().toByteArray()); 1484 } 1485 1486 /** 1487 * Find the List of HRegions based on a list of region specifiers 1488 * @param regionSpecifiers the list of region specifiers 1489 * @return the corresponding list of regions 1490 * @throws IOException if any of the specifiers is not null, but failed to find the region 1491 */ 1492 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1493 final CacheEvictionStatsBuilder stats) { 1494 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1495 for (RegionSpecifier regionSpecifier : regionSpecifiers) { 1496 try { 1497 regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray())); 1498 } catch (NotServingRegionException e) { 1499 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1500 } 1501 } 1502 return regions; 1503 } 1504 1505 public PriorityFunction getPriority() { 1506 return priority; 1507 } 1508 1509 public Configuration getConfiguration() { 1510 return regionServer.getConfiguration(); 1511 } 1512 1513 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1514 return regionServer.getRegionServerRpcQuotaManager(); 1515 } 1516 1517 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1518 return regionServer.getRegionServerSpaceQuotaManager(); 1519 } 1520 1521 void start(ZKWatcher zkWatcher) { 1522 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 1523 accessChecker = new AccessChecker(getConfiguration()); 1524 } else { 1525 accessChecker = new NoopAccessChecker(getConfiguration()); 1526 } 1527 zkPermissionWatcher = 1528 new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration()); 1529 try { 1530 zkPermissionWatcher.start(); 1531 } catch (KeeperException e) { 1532 LOG.error("ZooKeeper permission watcher initialization failed", e); 1533 } 1534 this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName); 1535 rpcServer.start(); 1536 } 1537 1538 void stop() { 1539 if (zkPermissionWatcher != null) { 1540 zkPermissionWatcher.close(); 1541 } 1542 closeAllScanners(); 1543 rpcServer.stop(); 1544 } 1545 1546 /** 1547 * Called to verify that this server is up and running. 1548 */ 1549 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1550 protected void checkOpen() throws IOException { 1551 if (regionServer.isAborted()) { 1552 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); 1553 } 1554 if (regionServer.isStopped()) { 1555 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); 1556 } 1557 if (!regionServer.isDataFileSystemOk()) { 1558 throw new RegionServerStoppedException("File system not available"); 1559 } 1560 if (!regionServer.isOnline()) { 1561 throw new ServerNotRunningYetException( 1562 "Server " + regionServer.serverName + " is not running yet"); 1563 } 1564 } 1565 1566 /** 1567 * By default, put up an Admin and a Client Service. Set booleans 1568 * <code>hbase.regionserver.admin.executorService</code> and 1569 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1570 * Default is that both are enabled. 1571 * @return immutable list of blocking services and the security info classes that this server 1572 * supports 1573 */ 1574 protected List<BlockingServiceAndInterface> getServices() { 1575 boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1576 boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1577 boolean clientMeta = 1578 getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); 1579 boolean bootstrapNodes = 1580 getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true); 1581 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1582 if (client) { 1583 bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), 1584 ClientService.BlockingInterface.class)); 1585 } 1586 if (admin) { 1587 bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), 1588 AdminService.BlockingInterface.class)); 1589 } 1590 if (clientMeta) { 1591 bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), 1592 ClientMetaService.BlockingInterface.class)); 1593 } 1594 if (bootstrapNodes) { 1595 bssi.add( 1596 new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this), 1597 BootstrapNodeService.BlockingInterface.class)); 1598 } 1599 return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1600 } 1601 1602 public InetSocketAddress getSocketAddress() { 1603 return isa; 1604 } 1605 1606 @Override 1607 public int getPriority(RequestHeader header, Message param, User user) { 1608 return priority.getPriority(header, param, user); 1609 } 1610 1611 @Override 1612 public long getDeadline(RequestHeader header, Message param) { 1613 return priority.getDeadline(header, param); 1614 } 1615 1616 /* 1617 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 1618 * @return True if we OOME'd and are aborting. 1619 */ 1620 @Override 1621 public boolean checkOOME(final Throwable e) { 1622 return exitIfOOME(e); 1623 } 1624 1625 public static boolean exitIfOOME(final Throwable e) { 1626 boolean stop = false; 1627 try { 1628 if ( 1629 e instanceof OutOfMemoryError 1630 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) 1631 || (e.getMessage() != null && e.getMessage().contains("java.lang.OutOfMemoryError")) 1632 ) { 1633 stop = true; 1634 LOG.error(HBaseMarkers.FATAL, "Run out of memory; " + RSRpcServices.class.getSimpleName() 1635 + " will abort itself immediately", e); 1636 } 1637 } finally { 1638 if (stop) { 1639 Runtime.getRuntime().halt(1); 1640 } 1641 } 1642 return stop; 1643 } 1644 1645 /** 1646 * Close a region on the region server. 1647 * @param controller the RPC controller 1648 * @param request the request 1649 */ 1650 @Override 1651 @QosPriority(priority = HConstants.ADMIN_QOS) 1652 public CloseRegionResponse closeRegion(final RpcController controller, 1653 final CloseRegionRequest request) throws ServiceException { 1654 final ServerName sn = (request.hasDestinationServer() 1655 ? ProtobufUtil.toServerName(request.getDestinationServer()) 1656 : null); 1657 1658 try { 1659 checkOpen(); 1660 throwOnWrongStartCode(request); 1661 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1662 1663 requestCount.increment(); 1664 if (sn == null) { 1665 LOG.info("Close " + encodedRegionName + " without moving"); 1666 } else { 1667 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1668 } 1669 boolean closed = regionServer.closeRegion(encodedRegionName, false, sn); 1670 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1671 return builder.build(); 1672 } catch (IOException ie) { 1673 throw new ServiceException(ie); 1674 } 1675 } 1676 1677 /** 1678 * Compact a region on the region server. 1679 * @param controller the RPC controller 1680 * @param request the request 1681 */ 1682 @Override 1683 @QosPriority(priority = HConstants.ADMIN_QOS) 1684 public CompactRegionResponse compactRegion(final RpcController controller, 1685 final CompactRegionRequest request) throws ServiceException { 1686 try { 1687 checkOpen(); 1688 requestCount.increment(); 1689 HRegion region = getRegion(request.getRegion()); 1690 // Quota support is enabled, the requesting user is not system/super user 1691 // and a quota policy is enforced that disables compactions. 1692 if ( 1693 QuotaUtil.isQuotaEnabled(getConfiguration()) 1694 && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) 1695 && this.regionServer.getRegionServerSpaceQuotaManager() 1696 .areCompactionsDisabled(region.getTableDescriptor().getTableName()) 1697 ) { 1698 throw new DoNotRetryIOException( 1699 "Compactions on this region are " + "disabled due to a space quota violation."); 1700 } 1701 region.startRegionOperation(Operation.COMPACT_REGION); 1702 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1703 boolean major = request.hasMajor() && request.getMajor(); 1704 if (request.hasFamily()) { 1705 byte[] family = request.getFamily().toByteArray(); 1706 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1707 + region.getRegionInfo().getRegionNameAsString() + " and family " 1708 + Bytes.toString(family); 1709 LOG.trace(log); 1710 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1711 CompactionLifeCycleTracker.DUMMY); 1712 } else { 1713 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " 1714 + region.getRegionInfo().getRegionNameAsString(); 1715 LOG.trace(log); 1716 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1717 } 1718 return CompactRegionResponse.newBuilder().build(); 1719 } catch (IOException ie) { 1720 throw new ServiceException(ie); 1721 } 1722 } 1723 1724 @Override 1725 public CompactionSwitchResponse compactionSwitch(RpcController controller, 1726 CompactionSwitchRequest request) throws ServiceException { 1727 rpcPreCheck("compactionSwitch"); 1728 final CompactSplit compactSplitThread = regionServer.getCompactSplitThread(); 1729 requestCount.increment(); 1730 boolean prevState = compactSplitThread.isCompactionsEnabled(); 1731 CompactionSwitchResponse response = 1732 CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); 1733 if (prevState == request.getEnabled()) { 1734 // passed in requested state is same as current state. No action required 1735 return response; 1736 } 1737 compactSplitThread.switchCompaction(request.getEnabled()); 1738 return response; 1739 } 1740 1741 /** 1742 * Flush a region on the region server. 1743 * @param controller the RPC controller 1744 * @param request the request 1745 */ 1746 @Override 1747 @QosPriority(priority = HConstants.ADMIN_QOS) 1748 public FlushRegionResponse flushRegion(final RpcController controller, 1749 final FlushRegionRequest request) throws ServiceException { 1750 try { 1751 checkOpen(); 1752 requestCount.increment(); 1753 HRegion region = getRegion(request.getRegion()); 1754 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1755 boolean shouldFlush = true; 1756 if (request.hasIfOlderThanTs()) { 1757 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1758 } 1759 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1760 if (shouldFlush) { 1761 boolean writeFlushWalMarker = 1762 request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; 1763 // Go behind the curtain so we can manage writing of the flush WAL marker 1764 HRegion.FlushResultImpl flushResult = null; 1765 if (request.hasFamily()) { 1766 List families = new ArrayList(); 1767 families.add(request.getFamily().toByteArray()); 1768 flushResult = 1769 region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1770 } else { 1771 flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1772 } 1773 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1774 if (compactionNeeded) { 1775 regionServer.getCompactSplitThread().requestSystemCompaction(region, 1776 "Compaction through user triggered flush"); 1777 } 1778 builder.setFlushed(flushResult.isFlushSucceeded()); 1779 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1780 } 1781 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1782 return builder.build(); 1783 } catch (DroppedSnapshotException ex) { 1784 // Cache flush can fail in a few places. If it fails in a critical 1785 // section, we get a DroppedSnapshotException and a replay of wal 1786 // is required. Currently the only way to do this is a restart of 1787 // the server. 1788 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1789 throw new ServiceException(ex); 1790 } catch (IOException ie) { 1791 throw new ServiceException(ie); 1792 } 1793 } 1794 1795 @Override 1796 @QosPriority(priority = HConstants.ADMIN_QOS) 1797 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1798 final GetOnlineRegionRequest request) throws ServiceException { 1799 try { 1800 checkOpen(); 1801 requestCount.increment(); 1802 Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions(); 1803 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1804 for (HRegion region : onlineRegions.values()) { 1805 list.add(region.getRegionInfo()); 1806 } 1807 list.sort(RegionInfo.COMPARATOR); 1808 return ResponseConverter.buildGetOnlineRegionResponse(list); 1809 } catch (IOException ie) { 1810 throw new ServiceException(ie); 1811 } 1812 } 1813 1814 // Master implementation of this Admin Service differs given it is not 1815 // able to supply detail only known to RegionServer. See note on 1816 // MasterRpcServers#getRegionInfo. 1817 @Override 1818 @QosPriority(priority = HConstants.ADMIN_QOS) 1819 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1820 final GetRegionInfoRequest request) throws ServiceException { 1821 try { 1822 checkOpen(); 1823 requestCount.increment(); 1824 HRegion region = getRegion(request.getRegion()); 1825 RegionInfo info = region.getRegionInfo(); 1826 byte[] bestSplitRow; 1827 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1828 bestSplitRow = region.checkSplit(true).orElse(null); 1829 // when all table data are in memstore, bestSplitRow = null 1830 // try to flush region first 1831 if (bestSplitRow == null) { 1832 region.flush(true); 1833 bestSplitRow = region.checkSplit(true).orElse(null); 1834 } 1835 } else { 1836 bestSplitRow = null; 1837 } 1838 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1839 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1840 if (request.hasCompactionState() && request.getCompactionState()) { 1841 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1842 } 1843 builder.setSplittable(region.isSplittable()); 1844 builder.setMergeable(region.isMergeable()); 1845 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1846 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1847 } 1848 return builder.build(); 1849 } catch (IOException ie) { 1850 throw new ServiceException(ie); 1851 } 1852 } 1853 1854 @Override 1855 @QosPriority(priority = HConstants.ADMIN_QOS) 1856 public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) 1857 throws ServiceException { 1858 1859 List<HRegion> regions; 1860 if (request.hasTableName()) { 1861 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1862 regions = regionServer.getRegions(tableName); 1863 } else { 1864 regions = regionServer.getRegions(); 1865 } 1866 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1867 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1868 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1869 1870 try { 1871 for (HRegion region : regions) { 1872 rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1873 } 1874 } catch (IOException e) { 1875 throw new ServiceException(e); 1876 } 1877 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1878 builder.addAllRegionLoads(rLoads); 1879 return builder.build(); 1880 } 1881 1882 @Override 1883 @QosPriority(priority = HConstants.ADMIN_QOS) 1884 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1885 ClearCompactionQueuesRequest request) throws ServiceException { 1886 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1887 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1888 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1889 requestCount.increment(); 1890 if (clearCompactionQueues.compareAndSet(false, true)) { 1891 final CompactSplit compactSplitThread = regionServer.getCompactSplitThread(); 1892 try { 1893 checkOpen(); 1894 regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1895 for (String queueName : request.getQueueNameList()) { 1896 LOG.debug("clear " + queueName + " compaction queue"); 1897 switch (queueName) { 1898 case "long": 1899 compactSplitThread.clearLongCompactionsQueue(); 1900 break; 1901 case "short": 1902 compactSplitThread.clearShortCompactionsQueue(); 1903 break; 1904 default: 1905 LOG.warn("Unknown queue name " + queueName); 1906 throw new IOException("Unknown queue name " + queueName); 1907 } 1908 } 1909 regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1910 } catch (IOException ie) { 1911 throw new ServiceException(ie); 1912 } finally { 1913 clearCompactionQueues.set(false); 1914 } 1915 } else { 1916 LOG.warn("Clear compactions queue is executing by other admin."); 1917 } 1918 return respBuilder.build(); 1919 } 1920 1921 /** 1922 * Get some information of the region server. 1923 * @param controller the RPC controller 1924 * @param request the request 1925 */ 1926 @Override 1927 @QosPriority(priority = HConstants.ADMIN_QOS) 1928 public GetServerInfoResponse getServerInfo(final RpcController controller, 1929 final GetServerInfoRequest request) throws ServiceException { 1930 try { 1931 checkOpen(); 1932 } catch (IOException ie) { 1933 throw new ServiceException(ie); 1934 } 1935 requestCount.increment(); 1936 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1; 1937 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort); 1938 } 1939 1940 @Override 1941 @QosPriority(priority = HConstants.ADMIN_QOS) 1942 public GetStoreFileResponse getStoreFile(final RpcController controller, 1943 final GetStoreFileRequest request) throws ServiceException { 1944 try { 1945 checkOpen(); 1946 HRegion region = getRegion(request.getRegion()); 1947 requestCount.increment(); 1948 Set<byte[]> columnFamilies; 1949 if (request.getFamilyCount() == 0) { 1950 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1951 } else { 1952 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1953 for (ByteString cf : request.getFamilyList()) { 1954 columnFamilies.add(cf.toByteArray()); 1955 } 1956 } 1957 int nCF = columnFamilies.size(); 1958 List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][])); 1959 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1960 builder.addAllStoreFile(fileList); 1961 return builder.build(); 1962 } catch (IOException ie) { 1963 throw new ServiceException(ie); 1964 } 1965 } 1966 1967 private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException { 1968 if (!request.hasServerStartCode()) { 1969 LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList()); 1970 return; 1971 } 1972 throwOnWrongStartCode(request.getServerStartCode()); 1973 } 1974 1975 private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException { 1976 if (!request.hasServerStartCode()) { 1977 LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion()); 1978 return; 1979 } 1980 throwOnWrongStartCode(request.getServerStartCode()); 1981 } 1982 1983 private void throwOnWrongStartCode(long serverStartCode) throws ServiceException { 1984 // check that we are the same server that this RPC is intended for. 1985 if (regionServer.serverName.getStartcode() != serverStartCode) { 1986 throw new ServiceException(new DoNotRetryIOException( 1987 "This RPC was intended for a " + "different server with startCode: " + serverStartCode 1988 + ", this server is: " + regionServer.serverName)); 1989 } 1990 } 1991 1992 private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException { 1993 if (req.getOpenRegionCount() > 0) { 1994 for (OpenRegionRequest openReq : req.getOpenRegionList()) { 1995 throwOnWrongStartCode(openReq); 1996 } 1997 } 1998 if (req.getCloseRegionCount() > 0) { 1999 for (CloseRegionRequest closeReq : req.getCloseRegionList()) { 2000 throwOnWrongStartCode(closeReq); 2001 } 2002 } 2003 } 2004 2005 /** 2006 * Open asynchronously a region or a set of regions on the region server. The opening is 2007 * coordinated by ZooKeeper, and this method requires the znode to be created before being called. 2008 * As a consequence, this method should be called only from the master. 2009 * <p> 2010 * Different manages states for the region are: 2011 * </p> 2012 * <ul> 2013 * <li>region not opened: the region opening will start asynchronously.</li> 2014 * <li>a close is already in progress: this is considered as an error.</li> 2015 * <li>an open is already in progress: this new open request will be ignored. This is important 2016 * because the Master can do multiple requests if it crashes.</li> 2017 * <li>the region is already opened: this new open request will be ignored.</li> 2018 * </ul> 2019 * <p> 2020 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 2021 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 2022 * errors are put in the response as FAILED_OPENING. 2023 * </p> 2024 * @param controller the RPC controller 2025 * @param request the request 2026 */ 2027 @Override 2028 @QosPriority(priority = HConstants.ADMIN_QOS) 2029 public OpenRegionResponse openRegion(final RpcController controller, 2030 final OpenRegionRequest request) throws ServiceException { 2031 requestCount.increment(); 2032 throwOnWrongStartCode(request); 2033 2034 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 2035 final int regionCount = request.getOpenInfoCount(); 2036 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 2037 final boolean isBulkAssign = regionCount > 1; 2038 try { 2039 checkOpen(); 2040 } catch (IOException ie) { 2041 TableName tableName = null; 2042 if (regionCount == 1) { 2043 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = 2044 request.getOpenInfo(0).getRegion(); 2045 if (ri != null) { 2046 tableName = ProtobufUtil.toTableName(ri.getTableName()); 2047 } 2048 } 2049 if (!TableName.META_TABLE_NAME.equals(tableName)) { 2050 throw new ServiceException(ie); 2051 } 2052 // We are assigning meta, wait a little for regionserver to finish initialization. 2053 // Default to quarter of RPC timeout 2054 int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2055 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; 2056 long endTime = EnvironmentEdgeManager.currentTime() + timeout; 2057 synchronized (regionServer.online) { 2058 try { 2059 while ( 2060 EnvironmentEdgeManager.currentTime() <= endTime && !regionServer.isStopped() 2061 && !regionServer.isOnline() 2062 ) { 2063 regionServer.online.wait(regionServer.msgInterval); 2064 } 2065 checkOpen(); 2066 } catch (InterruptedException t) { 2067 Thread.currentThread().interrupt(); 2068 throw new ServiceException(t); 2069 } catch (IOException e) { 2070 throw new ServiceException(e); 2071 } 2072 } 2073 } 2074 2075 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 2076 2077 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 2078 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 2079 TableDescriptor htd; 2080 try { 2081 String encodedName = region.getEncodedName(); 2082 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2083 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2084 if (onlineRegion != null) { 2085 // The region is already online. This should not happen any more. 2086 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 2087 + ", which is already online"; 2088 LOG.warn(error); 2089 // regionServer.abort(error); 2090 // throw new IOException(error); 2091 builder.addOpeningState(RegionOpeningState.OPENED); 2092 continue; 2093 } 2094 LOG.info("Open " + region.getRegionNameAsString()); 2095 2096 final Boolean previous = 2097 regionServer.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE); 2098 2099 if (Boolean.FALSE.equals(previous)) { 2100 if (regionServer.getRegion(encodedName) != null) { 2101 // There is a close in progress. This should not happen any more. 2102 String error = "Received OPEN for the region:" + region.getRegionNameAsString() 2103 + ", which we are already trying to CLOSE"; 2104 regionServer.abort(error); 2105 throw new IOException(error); 2106 } 2107 regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE); 2108 } 2109 2110 if (Boolean.TRUE.equals(previous)) { 2111 // An open is in progress. This is supported, but let's log this. 2112 LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString() 2113 + ", which we are already trying to OPEN" 2114 + " - ignoring this new request for this region."); 2115 } 2116 2117 // We are opening this region. If it moves back and forth for whatever reason, we don't 2118 // want to keep returning the stale moved record while we are opening/if we close again. 2119 regionServer.removeFromMovedRegions(region.getEncodedName()); 2120 2121 if (previous == null || !previous.booleanValue()) { 2122 htd = htds.get(region.getTable()); 2123 if (htd == null) { 2124 htd = regionServer.tableDescriptors.get(region.getTable()); 2125 htds.put(region.getTable(), htd); 2126 } 2127 if (htd == null) { 2128 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2129 } 2130 // If there is no action in progress, we can submit a specific handler. 2131 // Need to pass the expected version in the constructor. 2132 if (regionServer.executorService == null) { 2133 LOG.info("No executor executorService; skipping open request"); 2134 } else { 2135 if (region.isMetaRegion()) { 2136 regionServer.executorService.submit( 2137 new OpenMetaHandler(regionServer, regionServer, region, htd, masterSystemTime)); 2138 } else { 2139 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2140 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), 2141 regionOpenInfo.getFavoredNodesList()); 2142 } 2143 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2144 regionServer.executorService.submit(new OpenPriorityRegionHandler(regionServer, 2145 regionServer, region, htd, masterSystemTime)); 2146 } else { 2147 regionServer.executorService.submit( 2148 new OpenRegionHandler(regionServer, regionServer, region, htd, masterSystemTime)); 2149 } 2150 } 2151 } 2152 } 2153 2154 builder.addOpeningState(RegionOpeningState.OPENED); 2155 } catch (IOException ie) { 2156 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2157 if (isBulkAssign) { 2158 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2159 } else { 2160 throw new ServiceException(ie); 2161 } 2162 } 2163 } 2164 return builder.build(); 2165 } 2166 2167 /** 2168 * Warmup a region on this server. This method should only be called by Master. It synchronously 2169 * opens the region and closes the region bringing the most important pages in cache. 2170 */ 2171 @Override 2172 public WarmupRegionResponse warmupRegion(final RpcController controller, 2173 final WarmupRegionRequest request) throws ServiceException { 2174 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2175 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2176 try { 2177 checkOpen(); 2178 String encodedName = region.getEncodedName(); 2179 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2180 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2181 if (onlineRegion != null) { 2182 LOG.info("{} is online; skipping warmup", region); 2183 return response; 2184 } 2185 TableDescriptor htd = regionServer.tableDescriptors.get(region.getTable()); 2186 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2187 LOG.info("{} is in transition; skipping warmup", region); 2188 return response; 2189 } 2190 LOG.info("Warmup {}", region.getRegionNameAsString()); 2191 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), 2192 regionServer.getConfiguration(), regionServer, null); 2193 } catch (IOException ie) { 2194 LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie); 2195 throw new ServiceException(ie); 2196 } 2197 2198 return response; 2199 } 2200 2201 /** 2202 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2203 * that the given mutations will be durable on the receiving RS if this method returns without any 2204 * exception. 2205 * @param controller the RPC controller 2206 * @param request the request 2207 */ 2208 @Override 2209 @QosPriority(priority = HConstants.REPLAY_QOS) 2210 public ReplicateWALEntryResponse replay(final RpcController controller, 2211 final ReplicateWALEntryRequest request) throws ServiceException { 2212 long before = EnvironmentEdgeManager.currentTime(); 2213 CellScanner cells = ((HBaseRpcController) controller).cellScanner(); 2214 ((HBaseRpcController) controller).setCellScanner(null); 2215 try { 2216 checkOpen(); 2217 List<WALEntry> entries = request.getEntryList(); 2218 if (entries == null || entries.isEmpty()) { 2219 // empty input 2220 return ReplicateWALEntryResponse.newBuilder().build(); 2221 } 2222 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2223 HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); 2224 RegionCoprocessorHost coprocessorHost = 2225 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2226 ? region.getCoprocessorHost() 2227 : null; // do not invoke coprocessors if this is a secondary region replica 2228 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2229 2230 // Skip adding the edits to WAL if this is a secondary region replica 2231 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2232 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2233 2234 for (WALEntry entry : entries) { 2235 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2236 throw new NotServingRegionException("Replay request contains entries from multiple " 2237 + "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2238 + entry.getKey().getEncodedRegionName()); 2239 } 2240 if (regionServer.nonceManager != null && isPrimary) { 2241 long nonceGroup = 2242 entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2243 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2244 regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, 2245 entry.getKey().getWriteTime()); 2246 } 2247 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2248 List<MutationReplay> edits = 2249 WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); 2250 if (coprocessorHost != null) { 2251 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2252 // KeyValue. 2253 if ( 2254 coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2255 walEntry.getSecond()) 2256 ) { 2257 // if bypass this log entry, ignore it ... 2258 continue; 2259 } 2260 walEntries.add(walEntry); 2261 } 2262 if (edits != null && !edits.isEmpty()) { 2263 // HBASE-17924 2264 // sort to improve lock efficiency 2265 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2266 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) 2267 ? entry.getKey().getOrigSequenceNumber() 2268 : entry.getKey().getLogSequenceNumber(); 2269 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2270 // check if it's a partial success 2271 for (int i = 0; result != null && i < result.length; i++) { 2272 if (result[i] != OperationStatus.SUCCESS) { 2273 throw new IOException(result[i].getExceptionMsg()); 2274 } 2275 } 2276 } 2277 } 2278 2279 // sync wal at the end because ASYNC_WAL is used above 2280 WAL wal = region.getWAL(); 2281 if (wal != null) { 2282 wal.sync(); 2283 } 2284 2285 if (coprocessorHost != null) { 2286 for (Pair<WALKey, WALEdit> entry : walEntries) { 2287 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2288 entry.getSecond()); 2289 } 2290 } 2291 return ReplicateWALEntryResponse.newBuilder().build(); 2292 } catch (IOException ie) { 2293 throw new ServiceException(ie); 2294 } finally { 2295 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2296 if (metricsRegionServer != null) { 2297 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before); 2298 } 2299 } 2300 } 2301 2302 /** 2303 * Replicate WAL entries on the region server. 2304 * @param controller the RPC controller 2305 * @param request the request 2306 */ 2307 @Override 2308 @QosPriority(priority = HConstants.REPLICATION_QOS) 2309 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2310 final ReplicateWALEntryRequest request) throws ServiceException { 2311 try { 2312 checkOpen(); 2313 if (regionServer.getReplicationSinkService() != null) { 2314 requestCount.increment(); 2315 List<WALEntry> entries = request.getEntryList(); 2316 CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner(); 2317 ((HBaseRpcController) controller).setCellScanner(null); 2318 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2319 regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner, 2320 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2321 request.getSourceHFileArchiveDirPath()); 2322 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2323 return ReplicateWALEntryResponse.newBuilder().build(); 2324 } else { 2325 throw new ServiceException("Replication services are not initialized yet"); 2326 } 2327 } catch (IOException ie) { 2328 throw new ServiceException(ie); 2329 } 2330 } 2331 2332 /** 2333 * Roll the WAL writer of the region server. 2334 * @param controller the RPC controller 2335 * @param request the request 2336 */ 2337 @Override 2338 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2339 final RollWALWriterRequest request) throws ServiceException { 2340 try { 2341 checkOpen(); 2342 requestCount.increment(); 2343 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2344 regionServer.getWalRoller().requestRollAll(); 2345 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2346 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2347 return builder.build(); 2348 } catch (IOException ie) { 2349 throw new ServiceException(ie); 2350 } 2351 } 2352 2353 /** 2354 * Stop the region server. 2355 * @param controller the RPC controller 2356 * @param request the request 2357 */ 2358 @Override 2359 @QosPriority(priority = HConstants.ADMIN_QOS) 2360 public StopServerResponse stopServer(final RpcController controller, 2361 final StopServerRequest request) throws ServiceException { 2362 rpcPreCheck("stopServer"); 2363 requestCount.increment(); 2364 String reason = request.getReason(); 2365 regionServer.stop(reason); 2366 return StopServerResponse.newBuilder().build(); 2367 } 2368 2369 @Override 2370 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2371 UpdateFavoredNodesRequest request) throws ServiceException { 2372 rpcPreCheck("updateFavoredNodes"); 2373 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2374 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2375 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2376 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2377 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2378 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2379 regionUpdateInfo.getFavoredNodesList()); 2380 } 2381 } 2382 respBuilder.setResponse(openInfoList.size()); 2383 return respBuilder.build(); 2384 } 2385 2386 /** 2387 * Atomically bulk load several HFiles into an open region 2388 * @return true if successful, false is failed but recoverably (no action) 2389 * @throws ServiceException if failed unrecoverably 2390 */ 2391 @Override 2392 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2393 final BulkLoadHFileRequest request) throws ServiceException { 2394 long start = EnvironmentEdgeManager.currentTime(); 2395 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2396 if (clusterIds.contains(this.regionServer.clusterId)) { 2397 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2398 } else { 2399 clusterIds.add(this.regionServer.clusterId); 2400 } 2401 try { 2402 checkOpen(); 2403 requestCount.increment(); 2404 HRegion region = getRegion(request.getRegion()); 2405 Map<byte[], List<Path>> map = null; 2406 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2407 long sizeToBeLoaded = -1; 2408 2409 // Check to see if this bulk load would exceed the space quota for this table 2410 if (spaceQuotaEnabled) { 2411 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2412 SpaceViolationPolicyEnforcement enforcement = 2413 activeSpaceQuotas.getPolicyEnforcement(region); 2414 if (enforcement != null) { 2415 // Bulk loads must still be atomic. We must enact all or none. 2416 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2417 for (FamilyPath familyPath : request.getFamilyPathList()) { 2418 filePaths.add(familyPath.getPath()); 2419 } 2420 // Check if the batch of files exceeds the current quota 2421 sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths); 2422 } 2423 } 2424 2425 List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); 2426 for (FamilyPath familyPath : request.getFamilyPathList()) { 2427 familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); 2428 } 2429 if (!request.hasBulkToken()) { 2430 if (region.getCoprocessorHost() != null) { 2431 region.getCoprocessorHost().preBulkLoadHFile(familyPaths); 2432 } 2433 try { 2434 map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, 2435 request.getCopyFile(), clusterIds, request.getReplicate()); 2436 } finally { 2437 if (region.getCoprocessorHost() != null) { 2438 region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); 2439 } 2440 } 2441 } else { 2442 // secure bulk load 2443 map = 2444 regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds); 2445 } 2446 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2447 builder.setLoaded(map != null); 2448 if (map != null) { 2449 // Treat any negative size as a flag to "ignore" updating the region size as that is 2450 // not possible to occur in real life (cannot bulk load a file with negative size) 2451 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2452 if (LOG.isTraceEnabled()) { 2453 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2454 + sizeToBeLoaded + " bytes"); 2455 } 2456 // Inform space quotas of the new files for this region 2457 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(), 2458 sizeToBeLoaded); 2459 } 2460 } 2461 return builder.build(); 2462 } catch (IOException ie) { 2463 throw new ServiceException(ie); 2464 } finally { 2465 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2466 if (metricsRegionServer != null) { 2467 metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start); 2468 } 2469 } 2470 } 2471 2472 @Override 2473 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2474 PrepareBulkLoadRequest request) throws ServiceException { 2475 try { 2476 checkOpen(); 2477 requestCount.increment(); 2478 2479 HRegion region = getRegion(request.getRegion()); 2480 2481 String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request); 2482 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2483 builder.setBulkToken(bulkToken); 2484 return builder.build(); 2485 } catch (IOException ie) { 2486 throw new ServiceException(ie); 2487 } 2488 } 2489 2490 @Override 2491 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2492 CleanupBulkLoadRequest request) throws ServiceException { 2493 try { 2494 checkOpen(); 2495 requestCount.increment(); 2496 2497 HRegion region = getRegion(request.getRegion()); 2498 2499 regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request); 2500 return CleanupBulkLoadResponse.newBuilder().build(); 2501 } catch (IOException ie) { 2502 throw new ServiceException(ie); 2503 } 2504 } 2505 2506 @Override 2507 public CoprocessorServiceResponse execService(final RpcController controller, 2508 final CoprocessorServiceRequest request) throws ServiceException { 2509 try { 2510 checkOpen(); 2511 requestCount.increment(); 2512 HRegion region = getRegion(request.getRegion()); 2513 com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall()); 2514 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2515 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 2516 region.getRegionInfo().getRegionName())); 2517 // TODO: COPIES!!!!!! 2518 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 2519 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 2520 return builder.build(); 2521 } catch (IOException ie) { 2522 throw new ServiceException(ie); 2523 } 2524 } 2525 2526 private FileSystem getFileSystem(List<String> filePaths) throws IOException { 2527 if (filePaths.isEmpty()) { 2528 // local hdfs 2529 return regionServer.getFileSystem(); 2530 } 2531 // source hdfs 2532 return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration()); 2533 } 2534 2535 private com.google.protobuf.Message execServiceOnRegion(HRegion region, 2536 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2537 // ignore the passed in controller (from the serialized call) 2538 ServerRpcController execController = new ServerRpcController(); 2539 return region.execService(execController, serviceCall); 2540 } 2541 2542 /** 2543 * Get data from a table. 2544 * @param controller the RPC controller 2545 * @param request the get request 2546 */ 2547 @Override 2548 public GetResponse get(final RpcController controller, final GetRequest request) 2549 throws ServiceException { 2550 long before = EnvironmentEdgeManager.currentTime(); 2551 OperationQuota quota = null; 2552 HRegion region = null; 2553 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2554 try { 2555 checkOpen(); 2556 requestCount.increment(); 2557 rpcGetRequestCount.increment(); 2558 region = getRegion(request.getRegion()); 2559 2560 GetResponse.Builder builder = GetResponse.newBuilder(); 2561 ClientProtos.Get get = request.getGet(); 2562 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2563 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2564 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2565 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2566 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2567 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " 2568 + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " 2569 + "reverse Scan."); 2570 } 2571 Boolean existence = null; 2572 Result r = null; 2573 quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); 2574 2575 Get clientGet = ProtobufUtil.toGet(get); 2576 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2577 existence = region.getCoprocessorHost().preExists(clientGet); 2578 } 2579 if (existence == null) { 2580 if (context != null) { 2581 r = get(clientGet, (region), null, context); 2582 } else { 2583 // for test purpose 2584 r = region.get(clientGet); 2585 } 2586 if (get.getExistenceOnly()) { 2587 boolean exists = r.getExists(); 2588 if (region.getCoprocessorHost() != null) { 2589 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2590 } 2591 existence = exists; 2592 } 2593 } 2594 if (existence != null) { 2595 ClientProtos.Result pbr = 2596 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2597 builder.setResult(pbr); 2598 } else if (r != null) { 2599 ClientProtos.Result pbr; 2600 if ( 2601 isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2602 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3) 2603 ) { 2604 pbr = ProtobufUtil.toResultNoData(r); 2605 ((HBaseRpcController) controller) 2606 .setCellScanner(CellUtil.createCellScanner(r.rawCells())); 2607 addSize(context, r); 2608 } else { 2609 pbr = ProtobufUtil.toResult(r); 2610 } 2611 builder.setResult(pbr); 2612 } 2613 // r.cells is null when an table.exists(get) call 2614 if (r != null && r.rawCells() != null) { 2615 quota.addGetResult(r); 2616 } 2617 return builder.build(); 2618 } catch (IOException ie) { 2619 throw new ServiceException(ie); 2620 } finally { 2621 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 2622 if (metricsRegionServer != null && region != null) { 2623 long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; 2624 metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, 2625 blockBytesScanned); 2626 } 2627 if (quota != null) { 2628 quota.close(); 2629 } 2630 } 2631 } 2632 2633 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2634 RpcCallContext context) throws IOException { 2635 region.prepareGet(get); 2636 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2637 2638 // This method is almost the same as HRegion#get. 2639 List<Cell> results = new ArrayList<>(); 2640 long before = EnvironmentEdgeManager.currentTime(); 2641 // pre-get CP hook 2642 if (region.getCoprocessorHost() != null) { 2643 if (region.getCoprocessorHost().preGet(get, results)) { 2644 region.metricsUpdateForGet(results, before); 2645 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, 2646 stale); 2647 } 2648 } 2649 Scan scan = new Scan(get); 2650 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2651 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2652 } 2653 RegionScannerImpl scanner = null; 2654 try { 2655 scanner = region.getScanner(scan); 2656 scanner.next(results); 2657 } finally { 2658 if (scanner != null) { 2659 if (closeCallBack == null) { 2660 // If there is a context then the scanner can be added to the current 2661 // RpcCallContext. The rpc callback will take care of closing the 2662 // scanner, for eg in case 2663 // of get() 2664 context.setCallBack(scanner); 2665 } else { 2666 // The call is from multi() where the results from the get() are 2667 // aggregated and then send out to the 2668 // rpc. The rpccall back will close all such scanners created as part 2669 // of multi(). 2670 closeCallBack.addScanner(scanner); 2671 } 2672 } 2673 } 2674 2675 // post-get CP hook 2676 if (region.getCoprocessorHost() != null) { 2677 region.getCoprocessorHost().postGet(get, results); 2678 } 2679 region.metricsUpdateForGet(results, before); 2680 2681 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2682 } 2683 2684 private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { 2685 int sum = 0; 2686 String firstRegionName = null; 2687 for (RegionAction regionAction : request.getRegionActionList()) { 2688 if (sum == 0) { 2689 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2690 } 2691 sum += regionAction.getActionCount(); 2692 } 2693 if (sum > rowSizeWarnThreshold) { 2694 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 2695 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 2696 + RpcServer.getRequestUserName().orElse(null) + "/" 2697 + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName); 2698 if (rejectRowsWithSizeOverThreshold) { 2699 throw new ServiceException( 2700 "Rejecting large batch operation for current batch with firstRegionName: " 2701 + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: " 2702 + rowSizeWarnThreshold); 2703 } 2704 } 2705 } 2706 2707 private void failRegionAction(MultiResponse.Builder responseBuilder, 2708 RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, 2709 CellScanner cellScanner, Throwable error) { 2710 rpcServer.getMetrics().exception(error); 2711 regionActionResultBuilder.setException(ResponseConverter.buildException(error)); 2712 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2713 // All Mutations in this RegionAction not executed as we can not see the Region online here 2714 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2715 // corresponding to these Mutations. 2716 if (cellScanner != null) { 2717 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2718 } 2719 } 2720 2721 /** 2722 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2723 * @param rpcc the RPC controller 2724 * @param request the multi request 2725 */ 2726 @Override 2727 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2728 throws ServiceException { 2729 try { 2730 checkOpen(); 2731 } catch (IOException ie) { 2732 throw new ServiceException(ie); 2733 } 2734 2735 checkBatchSizeAndLogLargeSize(request); 2736 2737 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2738 // It is also the conduit via which we pass back data. 2739 HBaseRpcController controller = (HBaseRpcController) rpcc; 2740 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2741 if (controller != null) { 2742 controller.setCellScanner(null); 2743 } 2744 2745 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2746 2747 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2748 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2749 this.rpcMultiRequestCount.increment(); 2750 this.requestCount.increment(); 2751 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2752 2753 // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The 2754 // following logic is for backward compatibility as old clients still use 2755 // MultiRequest#condition in case of checkAndMutate with RowMutations. 2756 if (request.hasCondition()) { 2757 if (request.getRegionActionList().isEmpty()) { 2758 // If the region action list is empty, do nothing. 2759 responseBuilder.setProcessed(true); 2760 return responseBuilder.build(); 2761 } 2762 2763 RegionAction regionAction = request.getRegionAction(0); 2764 2765 // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So 2766 // we can assume regionAction.getAtomic() is true here. 2767 assert regionAction.getAtomic(); 2768 2769 OperationQuota quota; 2770 HRegion region; 2771 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2772 2773 try { 2774 region = getRegion(regionSpecifier); 2775 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2776 regionAction.hasCondition()); 2777 } catch (IOException e) { 2778 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2779 return responseBuilder.build(); 2780 } 2781 2782 try { 2783 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2784 cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement); 2785 responseBuilder.setProcessed(result.isSuccess()); 2786 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2787 ClientProtos.ResultOrException.newBuilder(); 2788 for (int i = 0; i < regionAction.getActionCount(); i++) { 2789 // To unify the response format with doNonAtomicRegionMutation and read through 2790 // client's AsyncProcess we have to add an empty result instance per operation 2791 resultOrExceptionOrBuilder.clear(); 2792 resultOrExceptionOrBuilder.setIndex(i); 2793 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2794 } 2795 } catch (IOException e) { 2796 rpcServer.getMetrics().exception(e); 2797 // As it's an atomic operation with a condition, we may expect it's a global failure. 2798 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2799 } finally { 2800 quota.close(); 2801 } 2802 2803 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2804 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2805 if (regionLoadStats != null) { 2806 responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder() 2807 .addRegion(regionSpecifier).addStat(regionLoadStats).build()); 2808 } 2809 return responseBuilder.build(); 2810 } 2811 2812 // this will contain all the cells that we need to return. It's created later, if needed. 2813 List<CellScannable> cellsToReturn = null; 2814 RegionScannersCloseCallBack closeCallBack = null; 2815 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2816 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = 2817 new HashMap<>(request.getRegionActionCount()); 2818 2819 for (RegionAction regionAction : request.getRegionActionList()) { 2820 OperationQuota quota; 2821 HRegion region; 2822 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2823 regionActionResultBuilder.clear(); 2824 2825 try { 2826 region = getRegion(regionSpecifier); 2827 quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), 2828 regionAction.hasCondition()); 2829 } catch (IOException e) { 2830 failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); 2831 continue; // For this region it's a failure. 2832 } 2833 2834 try { 2835 if (regionAction.hasCondition()) { 2836 try { 2837 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 2838 ClientProtos.ResultOrException.newBuilder(); 2839 if (regionAction.getActionCount() == 1) { 2840 CheckAndMutateResult result = 2841 checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, 2842 regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2843 regionActionResultBuilder.setProcessed(result.isSuccess()); 2844 resultOrExceptionOrBuilder.setIndex(0); 2845 if (result.getResult() != null) { 2846 resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult())); 2847 } 2848 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2849 } else { 2850 CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(), 2851 cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); 2852 regionActionResultBuilder.setProcessed(result.isSuccess()); 2853 for (int i = 0; i < regionAction.getActionCount(); i++) { 2854 if (i == 0 && result.getResult() != null) { 2855 // Set the result of the Increment/Append operations to the first element of the 2856 // ResultOrException list 2857 resultOrExceptionOrBuilder.setIndex(i); 2858 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder 2859 .setResult(ProtobufUtil.toResult(result.getResult())).build()); 2860 continue; 2861 } 2862 // To unify the response format with doNonAtomicRegionMutation and read through 2863 // client's AsyncProcess we have to add an empty result instance per operation 2864 resultOrExceptionOrBuilder.clear(); 2865 resultOrExceptionOrBuilder.setIndex(i); 2866 regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build()); 2867 } 2868 } 2869 } catch (IOException e) { 2870 rpcServer.getMetrics().exception(e); 2871 // As it's an atomic operation with a condition, we may expect it's a global failure. 2872 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2873 } 2874 } else if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2875 try { 2876 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2877 cellScanner, nonceGroup, spaceQuotaEnforcement); 2878 regionActionResultBuilder.setProcessed(true); 2879 // We no longer use MultiResponse#processed. Instead, we use 2880 // RegionActionResult#processed. This is for backward compatibility for old clients. 2881 responseBuilder.setProcessed(true); 2882 } catch (IOException e) { 2883 rpcServer.getMetrics().exception(e); 2884 // As it's atomic, we may expect it's a global failure. 2885 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2886 } 2887 } else { 2888 // doNonAtomicRegionMutation manages the exception internally 2889 if (context != null && closeCallBack == null) { 2890 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2891 // operation on completion of multiGets. 2892 // Set this only once 2893 closeCallBack = new RegionScannersCloseCallBack(); 2894 context.setCallBack(closeCallBack); 2895 } 2896 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2897 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2898 spaceQuotaEnforcement); 2899 } 2900 } finally { 2901 quota.close(); 2902 } 2903 2904 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2905 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2906 if (regionLoadStats != null) { 2907 regionStats.put(regionSpecifier, regionLoadStats); 2908 } 2909 } 2910 // Load the controller with the Cells to return. 2911 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2912 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2913 } 2914 2915 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2916 for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) { 2917 builder.addRegion(stat.getKey()); 2918 builder.addStat(stat.getValue()); 2919 } 2920 responseBuilder.setRegionStatistics(builder); 2921 return responseBuilder.build(); 2922 } 2923 2924 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2925 if (cellScanner == null) { 2926 return; 2927 } 2928 for (Action action : actions) { 2929 skipCellsForMutation(action, cellScanner); 2930 } 2931 } 2932 2933 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2934 if (cellScanner == null) { 2935 return; 2936 } 2937 try { 2938 if (action.hasMutation()) { 2939 MutationProto m = action.getMutation(); 2940 if (m.hasAssociatedCellCount()) { 2941 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2942 cellScanner.advance(); 2943 } 2944 } 2945 } 2946 } catch (IOException e) { 2947 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2948 // marked as failed as we could not see the Region here. At client side the top level 2949 // RegionAction exception will be considered first. 2950 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2951 } 2952 } 2953 2954 /** 2955 * Mutate data in a table. 2956 * @param rpcc the RPC controller 2957 * @param request the mutate request 2958 */ 2959 @Override 2960 public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) 2961 throws ServiceException { 2962 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2963 // It is also the conduit via which we pass back data. 2964 HBaseRpcController controller = (HBaseRpcController) rpcc; 2965 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2966 OperationQuota quota = null; 2967 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2968 // Clear scanner so we are not holding on to reference across call. 2969 if (controller != null) { 2970 controller.setCellScanner(null); 2971 } 2972 try { 2973 checkOpen(); 2974 requestCount.increment(); 2975 rpcMutateRequestCount.increment(); 2976 HRegion region = getRegion(request.getRegion()); 2977 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2978 MutationProto mutation = request.getMutation(); 2979 if (!region.getRegionInfo().isMetaRegion()) { 2980 regionServer.getMemStoreFlusher().reclaimMemStoreMemory(); 2981 } 2982 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2983 OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); 2984 quota = getRpcQuotaManager().checkBatchQuota(region, operationType); 2985 ActivePolicyEnforcement spaceQuotaEnforcement = 2986 getSpaceQuotaManager().getActiveEnforcements(); 2987 2988 if (request.hasCondition()) { 2989 CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, 2990 request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); 2991 builder.setProcessed(result.isSuccess()); 2992 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2993 addResult(builder, result.getResult(), controller, clientCellBlockSupported); 2994 if (clientCellBlockSupported) { 2995 addSize(context, result.getResult()); 2996 } 2997 } else { 2998 Result r = null; 2999 Boolean processed = null; 3000 MutationType type = mutation.getMutateType(); 3001 switch (type) { 3002 case APPEND: 3003 // TODO: this doesn't actually check anything. 3004 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 3005 context); 3006 break; 3007 case INCREMENT: 3008 // TODO: this doesn't actually check anything. 3009 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, 3010 context); 3011 break; 3012 case PUT: 3013 put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3014 processed = Boolean.TRUE; 3015 break; 3016 case DELETE: 3017 delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); 3018 processed = Boolean.TRUE; 3019 break; 3020 default: 3021 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 3022 } 3023 if (processed != null) { 3024 builder.setProcessed(processed); 3025 } 3026 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 3027 addResult(builder, r, controller, clientCellBlockSupported); 3028 if (clientCellBlockSupported) { 3029 addSize(context, r); 3030 } 3031 } 3032 return builder.build(); 3033 } catch (IOException ie) { 3034 regionServer.checkFileSystem(); 3035 throw new ServiceException(ie); 3036 } finally { 3037 if (quota != null) { 3038 quota.close(); 3039 } 3040 } 3041 } 3042 3043 private void put(HRegion region, OperationQuota quota, MutationProto mutation, 3044 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3045 long before = EnvironmentEdgeManager.currentTime(); 3046 Put put = ProtobufUtil.toPut(mutation, cellScanner); 3047 checkCellSizeLimit(region, put); 3048 spaceQuota.getPolicyEnforcement(region).check(put); 3049 quota.addMutation(put); 3050 region.put(put); 3051 3052 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3053 if (metricsRegionServer != null) { 3054 long after = EnvironmentEdgeManager.currentTime(); 3055 metricsRegionServer.updatePut(region, after - before); 3056 } 3057 } 3058 3059 private void delete(HRegion region, OperationQuota quota, MutationProto mutation, 3060 CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { 3061 long before = EnvironmentEdgeManager.currentTime(); 3062 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 3063 checkCellSizeLimit(region, delete); 3064 spaceQuota.getPolicyEnforcement(region).check(delete); 3065 quota.addMutation(delete); 3066 region.delete(delete); 3067 3068 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3069 if (metricsRegionServer != null) { 3070 long after = EnvironmentEdgeManager.currentTime(); 3071 metricsRegionServer.updateDelete(region, after - before); 3072 } 3073 } 3074 3075 private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, 3076 MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, 3077 ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { 3078 long before = EnvironmentEdgeManager.currentTime(); 3079 long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; 3080 CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); 3081 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 3082 checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); 3083 spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction()); 3084 quota.addMutation((Mutation) checkAndMutate.getAction()); 3085 3086 CheckAndMutateResult result = null; 3087 if (region.getCoprocessorHost() != null) { 3088 result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); 3089 } 3090 if (result == null) { 3091 result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); 3092 if (region.getCoprocessorHost() != null) { 3093 result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); 3094 } 3095 } 3096 MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3097 if (metricsRegionServer != null) { 3098 long after = EnvironmentEdgeManager.currentTime(); 3099 long blockBytesScanned = 3100 context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; 3101 metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); 3102 3103 MutationType type = mutation.getMutateType(); 3104 switch (type) { 3105 case PUT: 3106 metricsRegionServer.updateCheckAndPut(region, after - before); 3107 break; 3108 case DELETE: 3109 metricsRegionServer.updateCheckAndDelete(region, after - before); 3110 break; 3111 default: 3112 break; 3113 } 3114 } 3115 return result; 3116 } 3117 3118 // This is used to keep compatible with the old client implementation. Consider remove it if we 3119 // decide to drop the support of the client that still sends close request to a region scanner 3120 // which has already been exhausted. 3121 @Deprecated 3122 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 3123 3124 private static final long serialVersionUID = -4305297078988180130L; 3125 3126 @Override 3127 public synchronized Throwable fillInStackTrace() { 3128 return this; 3129 } 3130 }; 3131 3132 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 3133 String scannerName = toScannerName(request.getScannerId()); 3134 RegionScannerHolder rsh = this.scanners.get(scannerName); 3135 if (rsh == null) { 3136 // just ignore the next or close request if scanner does not exists. 3137 if (closedScanners.getIfPresent(scannerName) != null) { 3138 throw SCANNER_ALREADY_CLOSED; 3139 } else { 3140 LOG.warn("Client tried to access missing scanner " + scannerName); 3141 throw new UnknownScannerException( 3142 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " 3143 + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " 3144 + "long wait between consecutive client checkins, c) Server may be closing down, " 3145 + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " 3146 + "possible fix would be increasing the value of" 3147 + "'hbase.client.scanner.timeout.period' configuration."); 3148 } 3149 } 3150 RegionInfo hri = rsh.s.getRegionInfo(); 3151 // Yes, should be the same instance 3152 if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { 3153 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 3154 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 3155 LOG.warn(msg + ", closing..."); 3156 scanners.remove(scannerName); 3157 try { 3158 rsh.s.close(); 3159 } catch (IOException e) { 3160 LOG.warn("Getting exception closing " + scannerName, e); 3161 } finally { 3162 try { 3163 regionServer.getLeaseManager().cancelLease(scannerName); 3164 } catch (LeaseException e) { 3165 LOG.warn("Getting exception closing " + scannerName, e); 3166 } 3167 } 3168 throw new NotServingRegionException(msg); 3169 } 3170 return rsh; 3171 } 3172 3173 /** 3174 * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder 3175 * value. 3176 */ 3177 private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, 3178 ScanResponse.Builder builder) throws IOException { 3179 HRegion region = getRegion(request.getRegion()); 3180 ClientProtos.Scan protoScan = request.getScan(); 3181 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 3182 Scan scan = ProtobufUtil.toScan(protoScan); 3183 // if the request doesn't set this, get the default region setting. 3184 if (!isLoadingCfsOnDemandSet) { 3185 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 3186 } 3187 3188 if (!scan.hasFamilies()) { 3189 // Adding all families to scanner 3190 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 3191 scan.addFamily(family); 3192 } 3193 } 3194 if (region.getCoprocessorHost() != null) { 3195 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3196 // wrapper for the core created RegionScanner 3197 region.getCoprocessorHost().preScannerOpen(scan); 3198 } 3199 RegionScannerImpl coreScanner = region.getScanner(scan); 3200 Shipper shipper = coreScanner; 3201 RegionScanner scanner = coreScanner; 3202 try { 3203 if (region.getCoprocessorHost() != null) { 3204 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3205 } 3206 } catch (Exception e) { 3207 // Although region coprocessor is for advanced users and they should take care of the 3208 // implementation to not damage the HBase system, closing the scanner on exception here does 3209 // not have any bad side effect, so let's do it 3210 scanner.close(); 3211 throw e; 3212 } 3213 long scannerId = scannerIdGenerator.generateNewScannerId(); 3214 builder.setScannerId(scannerId); 3215 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3216 builder.setTtl(scannerLeaseTimeoutPeriod); 3217 String scannerName = toScannerName(scannerId); 3218 3219 boolean fullRegionScan = 3220 !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region); 3221 3222 return new Pair<String, RegionScannerHolder>(scannerName, 3223 addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan)); 3224 } 3225 3226 /** 3227 * The returned String is used as key doing look up of outstanding Scanners in this Servers' 3228 * this.scanners, the Map of outstanding scanners and their current state. 3229 * @param scannerId A scanner long id. 3230 * @return The long id as a String. 3231 */ 3232 private static String toScannerName(long scannerId) { 3233 return Long.toString(scannerId); 3234 } 3235 3236 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3237 throws OutOfOrderScannerNextException { 3238 // if nextCallSeq does not match throw Exception straight away. This needs to be 3239 // performed even before checking of Lease. 3240 // See HBASE-5974 3241 if (request.hasNextCallSeq()) { 3242 long callSeq = request.getNextCallSeq(); 3243 if (!rsh.incNextCallSeq(callSeq)) { 3244 throw new OutOfOrderScannerNextException( 3245 "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " 3246 + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); 3247 } 3248 } 3249 } 3250 3251 private void addScannerLeaseBack(LeaseManager.Lease lease) { 3252 try { 3253 regionServer.getLeaseManager().addLease(lease); 3254 } catch (LeaseStillHeldException e) { 3255 // should not happen as the scanner id is unique. 3256 throw new AssertionError(e); 3257 } 3258 } 3259 3260 // visible for testing only 3261 long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller, 3262 boolean allowHeartbeatMessages) { 3263 // Set the time limit to be half of the more restrictive timeout value (one of the 3264 // timeout values must be positive). In the event that both values are positive, the 3265 // more restrictive of the two is used to calculate the limit. 3266 if (allowHeartbeatMessages) { 3267 long now = EnvironmentEdgeManager.currentTime(); 3268 long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now); 3269 if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) { 3270 long timeLimitDelta; 3271 if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) { 3272 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout); 3273 } else { 3274 timeLimitDelta = 3275 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout; 3276 } 3277 3278 // Use half of whichever timeout value was more restrictive... But don't allow 3279 // the time limit to be less than the allowable minimum (could cause an 3280 // immediate timeout before scanning any data). 3281 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3282 return now + timeLimitDelta; 3283 } 3284 } 3285 // Default value of timeLimit is negative to indicate no timeLimit should be 3286 // enforced. 3287 return -1L; 3288 } 3289 3290 private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) { 3291 long timeout; 3292 if (controller != null && controller.getCallTimeout() > 0) { 3293 timeout = controller.getCallTimeout(); 3294 } else if (rpcTimeout > 0) { 3295 timeout = rpcTimeout; 3296 } else { 3297 return -1; 3298 } 3299 if (call != null) { 3300 timeout -= (now - call.getReceiveTime()); 3301 } 3302 // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high. 3303 // return minimum value here in that case so we count this in calculating the final delta. 3304 return Math.max(minimumScanTimeLimitDelta, timeout); 3305 } 3306 3307 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3308 ScannerContext scannerContext, ScanResponse.Builder builder) { 3309 if (numOfCompleteRows >= limitOfRows) { 3310 if (LOG.isTraceEnabled()) { 3311 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows 3312 + " scannerContext: " + scannerContext); 3313 } 3314 builder.setMoreResults(false); 3315 } 3316 } 3317 3318 // return whether we have more results in region. 3319 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3320 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3321 ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { 3322 HRegion region = rsh.r; 3323 RegionScanner scanner = rsh.s; 3324 long maxResultSize; 3325 if (scanner.getMaxResultSize() > 0) { 3326 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3327 } else { 3328 maxResultSize = maxQuotaResultSize; 3329 } 3330 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3331 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3332 // arbitrary 32. TODO: keep record of general size of results being returned. 3333 ArrayList<Cell> values = new ArrayList<>(32); 3334 region.startRegionOperation(Operation.SCAN); 3335 long before = EnvironmentEdgeManager.currentTime(); 3336 // Used to check if we've matched the row limit set on the Scan 3337 int numOfCompleteRows = 0; 3338 // Count of times we call nextRaw; can be > numOfCompleteRows. 3339 int numOfNextRawCalls = 0; 3340 try { 3341 int numOfResults = 0; 3342 synchronized (scanner) { 3343 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3344 boolean clientHandlesPartials = 3345 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3346 boolean clientHandlesHeartbeats = 3347 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3348 3349 // On the server side we must ensure that the correct ordering of partial results is 3350 // returned to the client to allow them to properly reconstruct the partial results. 3351 // If the coprocessor host is adding to the result list, we cannot guarantee the 3352 // correct ordering of partial results and so we prevent partial results from being 3353 // formed. 3354 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3355 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3356 boolean moreRows = false; 3357 3358 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3359 // certain time threshold on the server. When the time threshold is exceeded, the 3360 // server stops the scan and sends back whatever Results it has accumulated within 3361 // that time period (may be empty). Since heartbeat messages have the potential to 3362 // create partial Results (in the event that the timeout occurs in the middle of a 3363 // row), we must only generate heartbeat messages when the client can handle both 3364 // heartbeats AND partials 3365 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3366 3367 long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages); 3368 3369 final LimitScope sizeScope = 3370 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3371 final LimitScope timeScope = 3372 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3373 3374 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3375 3376 // Configure with limits for this RPC. Set keep progress true since size progress 3377 // towards size limit should be kept between calls to nextRaw 3378 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3379 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3380 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3381 // maxQuotaResultSize - max results just from server side configuration and quotas, without 3382 // user's specified max. We use this for evaluating limits based on blocks (not cells). 3383 // We may have accumulated some results in coprocessor preScannerNext call. Subtract any 3384 // cell or block size from maximum here so we adhere to total limits of request. 3385 // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will 3386 // have accumulated block bytes. If not, this will be 0 for block size. 3387 long maxCellSize = maxResultSize; 3388 long maxBlockSize = maxQuotaResultSize; 3389 if (rpcCall != null) { 3390 maxBlockSize -= rpcCall.getBlockBytesScanned(); 3391 maxCellSize -= rpcCall.getResponseCellSize(); 3392 } 3393 3394 contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize); 3395 contextBuilder.setBatchLimit(scanner.getBatch()); 3396 contextBuilder.setTimeLimit(timeScope, timeLimit); 3397 contextBuilder.setTrackMetrics(trackMetrics); 3398 ScannerContext scannerContext = contextBuilder.build(); 3399 boolean limitReached = false; 3400 while (numOfResults < maxResults) { 3401 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3402 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3403 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3404 // reset the batch progress between nextRaw invocations since we don't want the 3405 // batch progress from previous calls to affect future calls 3406 scannerContext.setBatchProgress(0); 3407 assert values.isEmpty(); 3408 3409 // Collect values to be returned here 3410 moreRows = scanner.nextRaw(values, scannerContext); 3411 if (rpcCall == null) { 3412 // When there is no RpcCallContext,copy EC to heap, then the scanner would close, 3413 // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap 3414 // buffers.See more details in HBASE-26036. 3415 CellUtil.cloneIfNecessary(values); 3416 } 3417 numOfNextRawCalls++; 3418 3419 if (!values.isEmpty()) { 3420 if (limitOfRows > 0) { 3421 // First we need to check if the last result is partial and we have a row change. If 3422 // so then we need to increase the numOfCompleteRows. 3423 if (results.isEmpty()) { 3424 if ( 3425 rsh.rowOfLastPartialResult != null 3426 && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult) 3427 ) { 3428 numOfCompleteRows++; 3429 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3430 builder); 3431 } 3432 } else { 3433 Result lastResult = results.get(results.size() - 1); 3434 if ( 3435 lastResult.mayHaveMoreCellsInRow() 3436 && !CellUtil.matchingRows(values.get(0), lastResult.getRow()) 3437 ) { 3438 numOfCompleteRows++; 3439 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3440 builder); 3441 } 3442 } 3443 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3444 break; 3445 } 3446 } 3447 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3448 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3449 results.add(r); 3450 numOfResults++; 3451 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3452 numOfCompleteRows++; 3453 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3454 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3455 break; 3456 } 3457 } 3458 } else if (!moreRows && !results.isEmpty()) { 3459 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3460 // last result is false. Otherwise it's possible that: the first nextRaw returned 3461 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3462 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3463 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3464 // be a contradictory state (HBASE-21206). 3465 int lastIdx = results.size() - 1; 3466 Result r = results.get(lastIdx); 3467 if (r.mayHaveMoreCellsInRow()) { 3468 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3469 } 3470 } 3471 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3472 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3473 boolean resultsLimitReached = numOfResults >= maxResults; 3474 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3475 3476 if (limitReached || !moreRows) { 3477 // With block size limit, we may exceed size limit without collecting any results. 3478 // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat 3479 // or cursor if results were collected, for example for cell size or heap size limits. 3480 boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty(); 3481 // We only want to mark a ScanResponse as a heartbeat message in the event that 3482 // there are more values to be read server side. If there aren't more values, 3483 // marking it as a heartbeat is wasteful because the client will need to issue 3484 // another ScanRequest only to realize that they already have all the values 3485 if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { 3486 // Heartbeat messages occur when the time limit has been reached, or size limit has 3487 // been reached before collecting any results. This can happen for heavily filtered 3488 // scans which scan over too many blocks. 3489 builder.setHeartbeatMessage(true); 3490 if (rsh.needCursor) { 3491 Cell cursorCell = scannerContext.getLastPeekedCell(); 3492 if (cursorCell != null) { 3493 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3494 } 3495 } 3496 } 3497 break; 3498 } 3499 values.clear(); 3500 } 3501 if (rpcCall != null) { 3502 rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); 3503 } 3504 builder.setMoreResultsInRegion(moreRows); 3505 // Check to see if the client requested that we track metrics server side. If the 3506 // client requested metrics, retrieve the metrics from the scanner context. 3507 if (trackMetrics) { 3508 // rather than increment yet another counter in StoreScanner, just set the value here 3509 // from block size progress before writing into the response 3510 scannerContext.getMetrics().countOfBlockBytesScanned 3511 .set(scannerContext.getBlockSizeProgress()); 3512 if (rpcCall != null) { 3513 scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime()); 3514 } 3515 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3516 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3517 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3518 3519 for (Entry<String, Long> entry : metrics.entrySet()) { 3520 pairBuilder.setName(entry.getKey()); 3521 pairBuilder.setValue(entry.getValue()); 3522 metricBuilder.addMetrics(pairBuilder.build()); 3523 } 3524 3525 builder.setScanMetrics(metricBuilder.build()); 3526 } 3527 } 3528 } finally { 3529 region.closeRegionOperation(); 3530 // Update serverside metrics, even on error. 3531 long end = EnvironmentEdgeManager.currentTime(); 3532 long responseCellSize = 0; 3533 long blockBytesScanned = 0; 3534 if (rpcCall != null) { 3535 responseCellSize = rpcCall.getResponseCellSize(); 3536 blockBytesScanned = rpcCall.getBlockBytesScanned(); 3537 rsh.updateBlockBytesScanned(blockBytesScanned); 3538 } 3539 region.getMetrics().updateScanTime(end - before); 3540 final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); 3541 if (metricsRegionServer != null) { 3542 metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); 3543 metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); 3544 } 3545 } 3546 // coprocessor postNext hook 3547 if (region.getCoprocessorHost() != null) { 3548 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3549 } 3550 } 3551 3552 /** 3553 * Scan data in a table. 3554 * @param controller the RPC controller 3555 * @param request the scan request 3556 */ 3557 @Override 3558 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3559 throws ServiceException { 3560 if (controller != null && !(controller instanceof HBaseRpcController)) { 3561 throw new UnsupportedOperationException( 3562 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3563 } 3564 if (!request.hasScannerId() && !request.hasScan()) { 3565 throw new ServiceException( 3566 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3567 } 3568 try { 3569 checkOpen(); 3570 } catch (IOException e) { 3571 if (request.hasScannerId()) { 3572 String scannerName = toScannerName(request.getScannerId()); 3573 if (LOG.isDebugEnabled()) { 3574 LOG.debug( 3575 "Server shutting down and client tried to access missing scanner " + scannerName); 3576 } 3577 final LeaseManager leaseManager = regionServer.getLeaseManager(); 3578 if (leaseManager != null) { 3579 try { 3580 leaseManager.cancelLease(scannerName); 3581 } catch (LeaseException le) { 3582 // No problem, ignore 3583 if (LOG.isTraceEnabled()) { 3584 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3585 } 3586 } 3587 } 3588 } 3589 throw new ServiceException(e); 3590 } 3591 requestCount.increment(); 3592 rpcScanRequestCount.increment(); 3593 RegionScannerHolder rsh; 3594 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3595 String scannerName; 3596 try { 3597 if (request.hasScannerId()) { 3598 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3599 // for more details. 3600 long scannerId = request.getScannerId(); 3601 builder.setScannerId(scannerId); 3602 scannerName = toScannerName(scannerId); 3603 rsh = getRegionScanner(request); 3604 } else { 3605 Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder); 3606 scannerName = scannerNameAndRSH.getFirst(); 3607 rsh = scannerNameAndRSH.getSecond(); 3608 } 3609 } catch (IOException e) { 3610 if (e == SCANNER_ALREADY_CLOSED) { 3611 // Now we will close scanner automatically if there are no more results for this region but 3612 // the old client will still send a close request to us. Just ignore it and return. 3613 return builder.build(); 3614 } 3615 throw new ServiceException(e); 3616 } 3617 if (rsh.fullRegionScan) { 3618 rpcFullScanRequestCount.increment(); 3619 } 3620 HRegion region = rsh.r; 3621 LeaseManager.Lease lease; 3622 try { 3623 // Remove lease while its being processed in server; protects against case 3624 // where processing of request takes > lease expiration time. or null if none found. 3625 lease = regionServer.getLeaseManager().removeLease(scannerName); 3626 } catch (LeaseException e) { 3627 throw new ServiceException(e); 3628 } 3629 if (request.hasRenew() && request.getRenew()) { 3630 // add back and return 3631 addScannerLeaseBack(lease); 3632 try { 3633 checkScanNextCallSeq(request, rsh); 3634 } catch (OutOfOrderScannerNextException e) { 3635 throw new ServiceException(e); 3636 } 3637 return builder.build(); 3638 } 3639 OperationQuota quota; 3640 try { 3641 quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 3642 rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); 3643 } catch (IOException e) { 3644 addScannerLeaseBack(lease); 3645 throw new ServiceException(e); 3646 } 3647 try { 3648 checkScanNextCallSeq(request, rsh); 3649 } catch (OutOfOrderScannerNextException e) { 3650 addScannerLeaseBack(lease); 3651 throw new ServiceException(e); 3652 } 3653 // Now we have increased the next call sequence. If we give client an error, the retry will 3654 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3655 // and then client will try to open a new scanner. 3656 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3657 int rows; // this is scan.getCaching 3658 if (request.hasNumberOfRows()) { 3659 rows = request.getNumberOfRows(); 3660 } else { 3661 rows = closeScanner ? 0 : 1; 3662 } 3663 RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null); 3664 // now let's do the real scan. 3665 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize()); 3666 RegionScanner scanner = rsh.s; 3667 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3668 // close the scanner. 3669 int limitOfRows; 3670 if (request.hasLimitOfRows()) { 3671 limitOfRows = request.getLimitOfRows(); 3672 } else { 3673 limitOfRows = -1; 3674 } 3675 boolean scannerClosed = false; 3676 try { 3677 List<Result> results = new ArrayList<>(Math.min(rows, 512)); 3678 if (rows > 0) { 3679 boolean done = false; 3680 // Call coprocessor. Get region info from scanner. 3681 if (region.getCoprocessorHost() != null) { 3682 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3683 if (!results.isEmpty()) { 3684 for (Result r : results) { 3685 // add cell size from CP results so we can track response size and update limits 3686 // when calling scan below if !done. We'll also have tracked block size if the CP 3687 // got results from hbase, since StoreScanner tracks that for all calls automatically. 3688 addSize(rpcCall, r); 3689 } 3690 } 3691 if (bypass != null && bypass.booleanValue()) { 3692 done = true; 3693 } 3694 } 3695 if (!done) { 3696 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3697 results, builder, rpcCall); 3698 } else { 3699 builder.setMoreResultsInRegion(!results.isEmpty()); 3700 } 3701 } else { 3702 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3703 builder.setMoreResultsInRegion(true); 3704 } 3705 3706 quota.addScanResult(results); 3707 addResults(builder, results, (HBaseRpcController) controller, 3708 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3709 isClientCellBlockSupport(rpcCall)); 3710 if (scanner.isFilterDone() && results.isEmpty()) { 3711 // If the scanner's filter - if any - is done with the scan 3712 // only set moreResults to false if the results is empty. This is used to keep compatible 3713 // with the old scan implementation where we just ignore the returned results if moreResults 3714 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3715 builder.setMoreResults(false); 3716 } 3717 // Later we may close the scanner depending on this flag so here we need to make sure that we 3718 // have already set this flag. 3719 assert builder.hasMoreResultsInRegion(); 3720 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3721 // yet. 3722 if (!builder.hasMoreResults()) { 3723 builder.setMoreResults(true); 3724 } 3725 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3726 // Record the last cell of the last result if it is a partial result 3727 // We need this to calculate the complete rows we have returned to client as the 3728 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3729 // current row. We may filter out all the remaining cells for the current row and just 3730 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3731 // check for row change. 3732 Result lastResult = results.get(results.size() - 1); 3733 if (lastResult.mayHaveMoreCellsInRow()) { 3734 rsh.rowOfLastPartialResult = lastResult.getRow(); 3735 } else { 3736 rsh.rowOfLastPartialResult = null; 3737 } 3738 } 3739 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3740 scannerClosed = true; 3741 closeScanner(region, scanner, scannerName, rpcCall); 3742 } 3743 3744 // There's no point returning to a timed out client. Throwing ensures scanner is closed 3745 if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) { 3746 throw new TimeoutIOException("Client deadline exceeded, cannot return results"); 3747 } 3748 3749 return builder.build(); 3750 } catch (IOException e) { 3751 try { 3752 // scanner is closed here 3753 scannerClosed = true; 3754 // The scanner state might be left in a dirty state, so we will tell the Client to 3755 // fail this RPC and close the scanner while opening up another one from the start of 3756 // row that the client has last seen. 3757 closeScanner(region, scanner, scannerName, rpcCall); 3758 3759 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3760 // used in two different semantics. 3761 // (1) The first is to close the client scanner and bubble up the exception all the way 3762 // to the application. This is preferred when the exception is really un-recoverable 3763 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3764 // bucket usually. 3765 // (2) Second semantics is to close the current region scanner only, but continue the 3766 // client scanner by overriding the exception. This is usually UnknownScannerException, 3767 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3768 // application-level ClientScanner has to continue without bubbling up the exception to 3769 // the client. See ClientScanner code to see how it deals with these special exceptions. 3770 if (e instanceof DoNotRetryIOException) { 3771 throw e; 3772 } 3773 3774 // If it is a FileNotFoundException, wrap as a 3775 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3776 if (e instanceof FileNotFoundException) { 3777 throw new DoNotRetryIOException(e); 3778 } 3779 3780 // We closed the scanner already. Instead of throwing the IOException, and client 3781 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3782 // a special exception to save an RPC. 3783 if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) { 3784 // 1.4.0+ clients know how to handle 3785 throw new ScannerResetException("Scanner is closed on the server-side", e); 3786 } else { 3787 // older clients do not know about SRE. Just throw USE, which they will handle 3788 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3789 + " scanner state for clients older than 1.3.", e); 3790 } 3791 } catch (IOException ioe) { 3792 throw new ServiceException(ioe); 3793 } 3794 } finally { 3795 if (!scannerClosed) { 3796 // Adding resets expiration time on lease. 3797 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3798 if (rpcCall != null) { 3799 rpcCall.setCallBack(rsh.shippedCallback); 3800 } else { 3801 // If context is null,here we call rsh.shippedCallback directly to reuse the logic in 3802 // rsh.shippedCallback to release the internal resources in rsh,and lease is also added 3803 // back to regionserver's LeaseManager in rsh.shippedCallback. 3804 runShippedCallback(rsh); 3805 } 3806 } 3807 quota.close(); 3808 } 3809 } 3810 3811 private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { 3812 assert rsh.shippedCallback != null; 3813 try { 3814 rsh.shippedCallback.run(); 3815 } catch (IOException ioe) { 3816 throw new ServiceException(ioe); 3817 } 3818 } 3819 3820 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3821 RpcCallContext context) throws IOException { 3822 if (region.getCoprocessorHost() != null) { 3823 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3824 // bypass the actual close. 3825 return; 3826 } 3827 } 3828 RegionScannerHolder rsh = scanners.remove(scannerName); 3829 if (rsh != null) { 3830 if (context != null) { 3831 context.setCallBack(rsh.closeCallBack); 3832 } else { 3833 rsh.s.close(); 3834 } 3835 if (region.getCoprocessorHost() != null) { 3836 region.getCoprocessorHost().postScannerClose(scanner); 3837 } 3838 closedScanners.put(scannerName, scannerName); 3839 } 3840 } 3841 3842 @Override 3843 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3844 CoprocessorServiceRequest request) throws ServiceException { 3845 rpcPreCheck("execRegionServerService"); 3846 return regionServer.execRegionServerService(controller, request); 3847 } 3848 3849 @Override 3850 public UpdateConfigurationResponse updateConfiguration(RpcController controller, 3851 UpdateConfigurationRequest request) throws ServiceException { 3852 try { 3853 requirePermission("updateConfiguration", Permission.Action.ADMIN); 3854 this.regionServer.updateConfiguration(); 3855 } catch (Exception e) { 3856 throw new ServiceException(e); 3857 } 3858 return UpdateConfigurationResponse.getDefaultInstance(); 3859 } 3860 3861 @Override 3862 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 3863 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3864 try { 3865 final RegionServerSpaceQuotaManager manager = regionServer.getRegionServerSpaceQuotaManager(); 3866 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3867 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3868 if (manager != null) { 3869 final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3870 for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3871 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3872 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3873 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build()); 3874 } 3875 } 3876 return builder.build(); 3877 } catch (Exception e) { 3878 throw new ServiceException(e); 3879 } 3880 } 3881 3882 @Override 3883 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3884 ClearRegionBlockCacheRequest request) throws ServiceException { 3885 3886 try { 3887 rpcPreCheck("clearRegionBlockCache"); 3888 ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder(); 3889 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3890 regionServer.getRegionServerCoprocessorHost().preClearRegionBlockCache(); 3891 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3892 for (HRegion region : regions) { 3893 try { 3894 stats = stats.append(this.regionServer.clearRegionBlockCache(region)); 3895 } catch (Exception e) { 3896 stats.addException(region.getRegionInfo().getRegionName(), e); 3897 } 3898 } 3899 stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L)); 3900 regionServer.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build()); 3901 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3902 } catch (IOException e) { 3903 throw new ServiceException(e); 3904 } 3905 } 3906 3907 private void executeOpenRegionProcedures(OpenRegionRequest request, 3908 Map<TableName, TableDescriptor> tdCache) { 3909 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 3910 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 3911 RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 3912 TableName tableName = regionInfo.getTable(); 3913 TableDescriptor tableDesc = tdCache.get(tableName); 3914 if (tableDesc == null) { 3915 try { 3916 tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable()); 3917 } catch (IOException e) { 3918 // Here we do not fail the whole method since we also need deal with other 3919 // procedures, and we can not ignore this one, so we still schedule a 3920 // AssignRegionHandler and it will report back to master if we still can not get the 3921 // TableDescriptor. 3922 LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler", 3923 regionInfo.getTable(), e); 3924 } 3925 if (tableDesc != null) { 3926 tdCache.put(tableName, tableDesc); 3927 } 3928 } 3929 if (regionOpenInfo.getFavoredNodesCount() > 0) { 3930 regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(), 3931 regionOpenInfo.getFavoredNodesList()); 3932 } 3933 long procId = regionOpenInfo.getOpenProcId(); 3934 if (regionServer.submitRegionProcedure(procId)) { 3935 regionServer.executorService.submit(AssignRegionHandler.create(regionServer, regionInfo, 3936 procId, tableDesc, masterSystemTime)); 3937 } 3938 } 3939 } 3940 3941 private void executeCloseRegionProcedures(CloseRegionRequest request) { 3942 String encodedName; 3943 try { 3944 encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 3945 } catch (DoNotRetryIOException e) { 3946 throw new UncheckedIOException("Should not happen", e); 3947 } 3948 ServerName destination = request.hasDestinationServer() 3949 ? ProtobufUtil.toServerName(request.getDestinationServer()) 3950 : null; 3951 long procId = request.getCloseProcId(); 3952 boolean evictCache = request.getEvictCache(); 3953 if (regionServer.submitRegionProcedure(procId)) { 3954 regionServer.getExecutorService().submit(UnassignRegionHandler.create(regionServer, 3955 encodedName, procId, false, destination, evictCache)); 3956 } 3957 } 3958 3959 private void executeProcedures(RemoteProcedureRequest request) { 3960 RSProcedureCallable callable; 3961 try { 3962 callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class) 3963 .getDeclaredConstructor().newInstance(); 3964 } catch (Exception e) { 3965 LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), 3966 request.getProcId(), e); 3967 regionServer.remoteProcedureComplete(request.getProcId(), e); 3968 return; 3969 } 3970 callable.init(request.getProcData().toByteArray(), regionServer); 3971 LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId()); 3972 regionServer.executeProcedure(request.getProcId(), callable); 3973 } 3974 3975 @Override 3976 @QosPriority(priority = HConstants.ADMIN_QOS) 3977 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3978 ExecuteProceduresRequest request) throws ServiceException { 3979 try { 3980 checkOpen(); 3981 throwOnWrongStartCode(request); 3982 regionServer.getRegionServerCoprocessorHost().preExecuteProcedures(); 3983 if (request.getOpenRegionCount() > 0) { 3984 // Avoid reading from the TableDescritor every time(usually it will read from the file 3985 // system) 3986 Map<TableName, TableDescriptor> tdCache = new HashMap<>(); 3987 request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache)); 3988 } 3989 if (request.getCloseRegionCount() > 0) { 3990 request.getCloseRegionList().forEach(this::executeCloseRegionProcedures); 3991 } 3992 if (request.getProcCount() > 0) { 3993 request.getProcList().forEach(this::executeProcedures); 3994 } 3995 regionServer.getRegionServerCoprocessorHost().postExecuteProcedures(); 3996 return ExecuteProceduresResponse.getDefaultInstance(); 3997 } catch (IOException e) { 3998 throw new ServiceException(e); 3999 } 4000 } 4001 4002 @Override 4003 @QosPriority(priority = HConstants.ADMIN_QOS) 4004 public SlowLogResponses getSlowLogResponses(final RpcController controller, 4005 final SlowLogResponseRequest request) { 4006 final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); 4007 final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); 4008 SlowLogResponses slowLogResponses = 4009 SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); 4010 return slowLogResponses; 4011 } 4012 4013 private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request, 4014 NamedQueueRecorder namedQueueRecorder) { 4015 if (namedQueueRecorder == null) { 4016 return Collections.emptyList(); 4017 } 4018 List<SlowLogPayload> slowLogPayloads; 4019 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 4020 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 4021 namedQueueGetRequest.setSlowLogResponseRequest(request); 4022 NamedQueueGetResponse namedQueueGetResponse = 4023 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 4024 slowLogPayloads = namedQueueGetResponse != null 4025 ? namedQueueGetResponse.getSlowLogPayloads() 4026 : Collections.emptyList(); 4027 return slowLogPayloads; 4028 } 4029 4030 @Override 4031 @QosPriority(priority = HConstants.ADMIN_QOS) 4032 public SlowLogResponses getLargeLogResponses(final RpcController controller, 4033 final SlowLogResponseRequest request) { 4034 final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); 4035 final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder); 4036 SlowLogResponses slowLogResponses = 4037 SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); 4038 return slowLogResponses; 4039 } 4040 4041 @Override 4042 @QosPriority(priority = HConstants.ADMIN_QOS) 4043 public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, 4044 final ClearSlowLogResponseRequest request) throws ServiceException { 4045 rpcPreCheck("clearSlowLogsResponses"); 4046 final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); 4047 boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder) 4048 .map( 4049 queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG)) 4050 .orElse(false); 4051 ClearSlowLogResponses clearSlowLogResponses = 4052 ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build(); 4053 return clearSlowLogResponses; 4054 } 4055 4056 @Override 4057 public HBaseProtos.LogEntry getLogEntries(RpcController controller, 4058 HBaseProtos.LogRequest request) throws ServiceException { 4059 try { 4060 final String logClassName = request.getLogClassName(); 4061 Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class); 4062 Method method = logClass.getMethod("parseFrom", ByteString.class); 4063 if (logClassName.contains("SlowLogResponseRequest")) { 4064 SlowLogResponseRequest slowLogResponseRequest = 4065 (SlowLogResponseRequest) method.invoke(null, request.getLogMessage()); 4066 final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); 4067 final List<SlowLogPayload> slowLogPayloads = 4068 getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder); 4069 SlowLogResponses slowLogResponses = 4070 SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); 4071 return HBaseProtos.LogEntry.newBuilder() 4072 .setLogClassName(slowLogResponses.getClass().getName()) 4073 .setLogMessage(slowLogResponses.toByteString()).build(); 4074 } 4075 } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException 4076 | InvocationTargetException e) { 4077 LOG.error("Error while retrieving log entries.", e); 4078 throw new ServiceException(e); 4079 } 4080 throw new ServiceException("Invalid request params"); 4081 } 4082 4083 @Override 4084 public GetCachedFilesListResponse getCachedFilesList(RpcController controller, 4085 GetCachedFilesListRequest request) throws ServiceException { 4086 GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder(); 4087 List<String> fullyCachedFiles = new ArrayList<>(); 4088 regionServer.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> { 4089 fullyCachedFiles.addAll(fcf.keySet()); 4090 }); 4091 return responseBuilder.addAllCachedFiles(fullyCachedFiles).build(); 4092 } 4093 4094 public RpcScheduler getRpcScheduler() { 4095 return rpcServer.getScheduler(); 4096 } 4097 4098 protected AccessChecker getAccessChecker() { 4099 return accessChecker; 4100 } 4101 4102 protected ZKPermissionWatcher getZkPermissionWatcher() { 4103 return zkPermissionWatcher; 4104 } 4105 4106 @Override 4107 public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request) 4108 throws ServiceException { 4109 return GetClusterIdResponse.newBuilder().setClusterId(regionServer.getClusterId()).build(); 4110 } 4111 4112 @Override 4113 public GetActiveMasterResponse getActiveMaster(RpcController controller, 4114 GetActiveMasterRequest request) throws ServiceException { 4115 GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder(); 4116 regionServer.getActiveMaster() 4117 .ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name))); 4118 return builder.build(); 4119 } 4120 4121 @Override 4122 public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request) 4123 throws ServiceException { 4124 GetMastersResponse.Builder builder = GetMastersResponse.newBuilder(); 4125 regionServer.getActiveMaster() 4126 .ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder() 4127 .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true))); 4128 regionServer.getBackupMasters() 4129 .forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder() 4130 .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false))); 4131 return builder.build(); 4132 } 4133 4134 @Override 4135 public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller, 4136 GetMetaRegionLocationsRequest request) throws ServiceException { 4137 GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder(); 4138 Optional<List<HRegionLocation>> metaLocations = 4139 regionServer.getMetaRegionLocationCache().getMetaRegionLocations(); 4140 metaLocations.ifPresent(hRegionLocations -> hRegionLocations 4141 .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)))); 4142 return builder.build(); 4143 } 4144 4145 @Override 4146 public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller, 4147 GetBootstrapNodesRequest request) throws ServiceException { 4148 int maxNodeCount = regionServer.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT, 4149 DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); 4150 ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount); 4151 sample.add(regionServer.getBootstrapNodes()); 4152 4153 GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder(); 4154 sample.getSamplingResult().stream().map(ProtobufUtil::toServerName) 4155 .forEach(builder::addServerName); 4156 return builder.build(); 4157 } 4158 4159 @Override 4160 public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller, 4161 GetAllBootstrapNodesRequest request) throws ServiceException { 4162 GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder(); 4163 regionServer.getBootstrapNodes() 4164 .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server))); 4165 return builder.build(); 4166 } 4167 4168 private void setReloadableGuardrails(Configuration conf) { 4169 rowSizeWarnThreshold = 4170 conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); 4171 rejectRowsWithSizeOverThreshold = 4172 conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD); 4173 maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4174 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 4175 } 4176}