001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.lang.reflect.InvocationTargetException;
025import java.lang.reflect.Method;
026import java.net.BindException;
027import java.net.InetAddress;
028import java.net.InetSocketAddress;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.NavigableMap;
038import java.util.Optional;
039import java.util.Set;
040import java.util.TreeSet;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.atomic.AtomicBoolean;
045import java.util.concurrent.atomic.AtomicLong;
046import java.util.concurrent.atomic.LongAdder;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellScannable;
054import org.apache.hadoop.hbase.CellScanner;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.DoNotRetryIOException;
057import org.apache.hadoop.hbase.DroppedSnapshotException;
058import org.apache.hadoop.hbase.HBaseIOException;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.HRegionLocation;
061import org.apache.hadoop.hbase.MultiActionResultTooLarge;
062import org.apache.hadoop.hbase.NotServingRegionException;
063import org.apache.hadoop.hbase.PrivateCellUtil;
064import org.apache.hadoop.hbase.RegionTooBusyException;
065import org.apache.hadoop.hbase.Server;
066import org.apache.hadoop.hbase.ServerName;
067import org.apache.hadoop.hbase.TableName;
068import org.apache.hadoop.hbase.UnknownScannerException;
069import org.apache.hadoop.hbase.client.Append;
070import org.apache.hadoop.hbase.client.CheckAndMutate;
071import org.apache.hadoop.hbase.client.CheckAndMutateResult;
072import org.apache.hadoop.hbase.client.ConnectionUtils;
073import org.apache.hadoop.hbase.client.Delete;
074import org.apache.hadoop.hbase.client.Durability;
075import org.apache.hadoop.hbase.client.Get;
076import org.apache.hadoop.hbase.client.Increment;
077import org.apache.hadoop.hbase.client.Mutation;
078import org.apache.hadoop.hbase.client.OperationWithAttributes;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionReplicaUtil;
082import org.apache.hadoop.hbase.client.Result;
083import org.apache.hadoop.hbase.client.Row;
084import org.apache.hadoop.hbase.client.Scan;
085import org.apache.hadoop.hbase.client.TableDescriptor;
086import org.apache.hadoop.hbase.client.VersionInfoUtil;
087import org.apache.hadoop.hbase.conf.ConfigurationObserver;
088import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
089import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
090import org.apache.hadoop.hbase.exceptions.ScannerResetException;
091import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
092import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
093import org.apache.hadoop.hbase.io.ByteBuffAllocator;
094import org.apache.hadoop.hbase.io.hfile.BlockCache;
095import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
096import org.apache.hadoop.hbase.ipc.HBaseRpcController;
097import org.apache.hadoop.hbase.ipc.PriorityFunction;
098import org.apache.hadoop.hbase.ipc.QosPriority;
099import org.apache.hadoop.hbase.ipc.RpcCall;
100import org.apache.hadoop.hbase.ipc.RpcCallContext;
101import org.apache.hadoop.hbase.ipc.RpcCallback;
102import org.apache.hadoop.hbase.ipc.RpcScheduler;
103import org.apache.hadoop.hbase.ipc.RpcServer;
104import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
105import org.apache.hadoop.hbase.ipc.RpcServerFactory;
106import org.apache.hadoop.hbase.ipc.RpcServerInterface;
107import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
108import org.apache.hadoop.hbase.ipc.ServerRpcController;
109import org.apache.hadoop.hbase.log.HBaseMarkers;
110import org.apache.hadoop.hbase.master.HMaster;
111import org.apache.hadoop.hbase.master.MasterRpcServices;
112import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
113import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
114import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
115import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
116import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
117import org.apache.hadoop.hbase.net.Address;
118import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
119import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
120import org.apache.hadoop.hbase.quotas.OperationQuota;
121import org.apache.hadoop.hbase.quotas.QuotaUtil;
122import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
123import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
124import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
125import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
126import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
127import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
128import org.apache.hadoop.hbase.regionserver.Region.Operation;
129import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
130import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
131import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
132import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
133import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
134import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
135import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
136import org.apache.hadoop.hbase.security.Superusers;
137import org.apache.hadoop.hbase.security.User;
138import org.apache.hadoop.hbase.security.access.AccessChecker;
139import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
140import org.apache.hadoop.hbase.security.access.Permission;
141import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
142import org.apache.hadoop.hbase.util.Bytes;
143import org.apache.hadoop.hbase.util.DNS;
144import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
145import org.apache.hadoop.hbase.util.Pair;
146import org.apache.hadoop.hbase.util.ReservoirSample;
147import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
148import org.apache.hadoop.hbase.wal.WAL;
149import org.apache.hadoop.hbase.wal.WALEdit;
150import org.apache.hadoop.hbase.wal.WALKey;
151import org.apache.hadoop.hbase.wal.WALSplitUtil;
152import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
153import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
154import org.apache.yetus.audience.InterfaceAudience;
155import org.apache.zookeeper.KeeperException;
156import org.slf4j.Logger;
157import org.slf4j.LoggerFactory;
158
159import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
160import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
161import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
162import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
163import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
164import org.apache.hbase.thirdparty.com.google.protobuf.Message;
165import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
166import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
167import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
168import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
169import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
170
171import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
172import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
173import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
282
283/**
284 * Implements the regionserver RPC services.
285 */
286@InterfaceAudience.Private
287@SuppressWarnings("deprecation")
288public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface,
289  ClientService.BlockingInterface, ClientMetaService.BlockingInterface,
290  BootstrapNodeService.BlockingInterface, PriorityFunction, ConfigurationObserver {
291  protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
292
293  /** RPC scheduler to use for the region server. */
294  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
295    "hbase.region.server.rpc.scheduler.factory.class";
296
297  /** RPC scheduler to use for the master. */
298  public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS =
299    "hbase.master.rpc.scheduler.factory.class";
300
301  /**
302   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
303   * configuration exists to prevent the scenario where a time limit is specified to be so
304   * restrictive that the time limit is reached immediately (before any cells are scanned).
305   */
306  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
307    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
308  /**
309   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
310   */
311  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
312
313  /**
314   * Whether to reject rows with size > threshold defined by
315   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
316   */
317  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
318    "hbase.rpc.rows.size.threshold.reject";
319
320  /**
321   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
322   */
323  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
324
325  public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit";
326
327  public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10;
328
329  // Request counter. (Includes requests that are not serviced by regions.)
330  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
331  final LongAdder requestCount = new LongAdder();
332
333  // Request counter for rpc get
334  final LongAdder rpcGetRequestCount = new LongAdder();
335
336  // Request counter for rpc scan
337  final LongAdder rpcScanRequestCount = new LongAdder();
338
339  // Request counter for scans that might end up in full scans
340  final LongAdder rpcFullScanRequestCount = new LongAdder();
341
342  // Request counter for rpc multi
343  final LongAdder rpcMultiRequestCount = new LongAdder();
344
345  // Request counter for rpc mutate
346  final LongAdder rpcMutateRequestCount = new LongAdder();
347
348  // Server to handle client requests.
349  final RpcServerInterface rpcServer;
350  final InetSocketAddress isa;
351
352  protected final HRegionServer regionServer;
353  private volatile long maxScannerResultSize;
354
355  // The reference to the priority extraction function
356  private final PriorityFunction priority;
357
358  private ScannerIdGenerator scannerIdGenerator;
359  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
360  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
361  // which may send next or close request to a region scanner which has already been exhausted. The
362  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
363  private final Cache<String, String> closedScanners;
364  /**
365   * The lease timeout period for client scanners (milliseconds).
366   */
367  private final int scannerLeaseTimeoutPeriod;
368
369  /**
370   * The RPC timeout period (milliseconds)
371   */
372  private final int rpcTimeout;
373
374  /**
375   * The minimum allowable delta to use for the scan limit
376   */
377  private final long minimumScanTimeLimitDelta;
378
379  /**
380   * Row size threshold for multi requests above which a warning is logged
381   */
382  private volatile int rowSizeWarnThreshold;
383  /*
384   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
385   * rowSizeWarnThreshold
386   */
387  private volatile boolean rejectRowsWithSizeOverThreshold;
388
389  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
390
391  private AccessChecker accessChecker;
392  private ZKPermissionWatcher zkPermissionWatcher;
393
394  /**
395   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
396   * to selectively enable/disable these services (Rare is the case where you would ever turn off
397   * one or the other).
398   */
399  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
400    "hbase.regionserver.admin.executorService";
401  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
402    "hbase.regionserver.client.executorService";
403  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
404    "hbase.regionserver.client.meta.executorService";
405  public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG =
406    "hbase.regionserver.bootstrap.nodes.executorService";
407
408  /**
409   * An Rpc callback for closing a RegionScanner.
410   */
411  private static final class RegionScannerCloseCallBack implements RpcCallback {
412
413    private final RegionScanner scanner;
414
415    public RegionScannerCloseCallBack(RegionScanner scanner) {
416      this.scanner = scanner;
417    }
418
419    @Override
420    public void run() throws IOException {
421      this.scanner.close();
422    }
423  }
424
425  /**
426   * An Rpc callback for doing shipped() call on a RegionScanner.
427   */
428  private class RegionScannerShippedCallBack implements RpcCallback {
429    private final String scannerName;
430    private final Shipper shipper;
431    private final Lease lease;
432
433    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
434      this.scannerName = scannerName;
435      this.shipper = shipper;
436      this.lease = lease;
437    }
438
439    @Override
440    public void run() throws IOException {
441      this.shipper.shipped();
442      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
443      // Rpc call and we are at end of the call now. Time to add it back.
444      if (scanners.containsKey(scannerName)) {
445        if (lease != null) {
446          regionServer.getLeaseManager().addLease(lease);
447        }
448      }
449    }
450  }
451
452  /**
453   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
454   * completion of multiGets.
455   */
456  static class RegionScannersCloseCallBack implements RpcCallback {
457    private final List<RegionScanner> scanners = new ArrayList<>();
458
459    public void addScanner(RegionScanner scanner) {
460      this.scanners.add(scanner);
461    }
462
463    @Override
464    public void run() {
465      for (RegionScanner scanner : scanners) {
466        try {
467          scanner.close();
468        } catch (IOException e) {
469          LOG.error("Exception while closing the scanner " + scanner, e);
470        }
471      }
472    }
473  }
474
475  /**
476   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
477   */
478  static final class RegionScannerHolder {
479    private final AtomicLong nextCallSeq = new AtomicLong(0);
480    private final RegionScanner s;
481    private final HRegion r;
482    private final RpcCallback closeCallBack;
483    private final RpcCallback shippedCallback;
484    private byte[] rowOfLastPartialResult;
485    private boolean needCursor;
486    private boolean fullRegionScan;
487    private final String clientIPAndPort;
488    private final String userName;
489    private volatile long maxBlockBytesScanned = 0;
490    private volatile long prevBlockBytesScanned = 0;
491    private volatile long prevBlockBytesScannedDifference = 0;
492
493    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
494      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
495      String clientIPAndPort, String userName) {
496      this.s = s;
497      this.r = r;
498      this.closeCallBack = closeCallBack;
499      this.shippedCallback = shippedCallback;
500      this.needCursor = needCursor;
501      this.fullRegionScan = fullRegionScan;
502      this.clientIPAndPort = clientIPAndPort;
503      this.userName = userName;
504    }
505
506    long getNextCallSeq() {
507      return nextCallSeq.get();
508    }
509
510    boolean incNextCallSeq(long currentSeq) {
511      // Use CAS to prevent multiple scan request running on the same scanner.
512      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
513    }
514
515    long getMaxBlockBytesScanned() {
516      return maxBlockBytesScanned;
517    }
518
519    long getPrevBlockBytesScannedDifference() {
520      return prevBlockBytesScannedDifference;
521    }
522
523    void updateBlockBytesScanned(long blockBytesScanned) {
524      prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned;
525      prevBlockBytesScanned = blockBytesScanned;
526      if (blockBytesScanned > maxBlockBytesScanned) {
527        maxBlockBytesScanned = blockBytesScanned;
528      }
529    }
530
531    // Should be called only when we need to print lease expired messages otherwise
532    // cache the String once made.
533    @Override
534    public String toString() {
535      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
536        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
537    }
538  }
539
540  /**
541   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
542   */
543  private class ScannerListener implements LeaseListener {
544    private final String scannerName;
545
546    ScannerListener(final String n) {
547      this.scannerName = n;
548    }
549
550    @Override
551    public void leaseExpired() {
552      RegionScannerHolder rsh = scanners.remove(this.scannerName);
553      if (rsh == null) {
554        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
555        return;
556      }
557      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
558      regionServer.getMetrics().incrScannerLeaseExpired();
559      RegionScanner s = rsh.s;
560      HRegion region = null;
561      try {
562        region = regionServer.getRegion(s.getRegionInfo().getRegionName());
563        if (region != null && region.getCoprocessorHost() != null) {
564          region.getCoprocessorHost().preScannerClose(s);
565        }
566      } catch (IOException e) {
567        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
568      } finally {
569        try {
570          s.close();
571          if (region != null && region.getCoprocessorHost() != null) {
572            region.getCoprocessorHost().postScannerClose(s);
573          }
574        } catch (IOException e) {
575          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
576        }
577      }
578    }
579  }
580
581  private static ResultOrException getResultOrException(final ClientProtos.Result r,
582    final int index) {
583    return getResultOrException(ResponseConverter.buildActionResult(r), index);
584  }
585
586  private static ResultOrException getResultOrException(final Exception e, final int index) {
587    return getResultOrException(ResponseConverter.buildActionResult(e), index);
588  }
589
590  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
591    final int index) {
592    return builder.setIndex(index).build();
593  }
594
595  /**
596   * Checks for the following pre-checks in order:
597   * <ol>
598   * <li>RegionServer is running</li>
599   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
600   * </ol>
601   * @param requestName name of rpc request. Used in reporting failures to provide context.
602   * @throws ServiceException If any of the above listed pre-check fails.
603   */
604  private void rpcPreCheck(String requestName) throws ServiceException {
605    try {
606      checkOpen();
607      requirePermission(requestName, Permission.Action.ADMIN);
608    } catch (IOException ioe) {
609      throw new ServiceException(ioe);
610    }
611  }
612
613  private boolean isClientCellBlockSupport(RpcCallContext context) {
614    return context != null && context.isClientCellBlockSupported();
615  }
616
617  private void addResult(final MutateResponse.Builder builder, final Result result,
618    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
619    if (result == null) return;
620    if (clientCellBlockSupported) {
621      builder.setResult(ProtobufUtil.toResultNoData(result));
622      rpcc.setCellScanner(result.cellScanner());
623    } else {
624      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
625      builder.setResult(pbr);
626    }
627  }
628
629  private void addResults(ScanResponse.Builder builder, List<Result> results,
630    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
631    builder.setStale(!isDefaultRegion);
632    if (results.isEmpty()) {
633      return;
634    }
635    if (clientCellBlockSupported) {
636      for (Result res : results) {
637        builder.addCellsPerResult(res.size());
638        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
639      }
640      controller.setCellScanner(CellUtil.createCellScanner(results));
641    } else {
642      for (Result res : results) {
643        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
644        builder.addResults(pbr);
645      }
646    }
647  }
648
649  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
650    CellScanner cellScanner, Condition condition, long nonceGroup,
651    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
652    int countOfCompleteMutation = 0;
653    try {
654      if (!region.getRegionInfo().isMetaRegion()) {
655        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
656      }
657      List<Mutation> mutations = new ArrayList<>();
658      long nonce = HConstants.NO_NONCE;
659      for (ClientProtos.Action action : actions) {
660        if (action.hasGet()) {
661          throw new DoNotRetryIOException(
662            "Atomic put and/or delete only, not a Get=" + action.getGet());
663        }
664        MutationProto mutation = action.getMutation();
665        MutationType type = mutation.getMutateType();
666        switch (type) {
667          case PUT:
668            Put put = ProtobufUtil.toPut(mutation, cellScanner);
669            ++countOfCompleteMutation;
670            checkCellSizeLimit(region, put);
671            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
672            mutations.add(put);
673            break;
674          case DELETE:
675            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
676            ++countOfCompleteMutation;
677            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
678            mutations.add(del);
679            break;
680          case INCREMENT:
681            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
682            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
683            ++countOfCompleteMutation;
684            checkCellSizeLimit(region, increment);
685            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
686            mutations.add(increment);
687            break;
688          case APPEND:
689            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
690            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
691            ++countOfCompleteMutation;
692            checkCellSizeLimit(region, append);
693            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
694            mutations.add(append);
695            break;
696          default:
697            throw new DoNotRetryIOException("invalid mutation type : " + type);
698        }
699      }
700
701      if (mutations.size() == 0) {
702        return new CheckAndMutateResult(true, null);
703      } else {
704        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
705        CheckAndMutateResult result = null;
706        if (region.getCoprocessorHost() != null) {
707          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
708        }
709        if (result == null) {
710          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
711          if (region.getCoprocessorHost() != null) {
712            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
713          }
714        }
715        return result;
716      }
717    } finally {
718      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
719      // even if the malformed cells are not skipped.
720      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
721        skipCellsForMutation(actions.get(i), cellScanner);
722      }
723    }
724  }
725
726  /**
727   * Execute an append mutation.
728   * @return result to return to client if default operation should be bypassed as indicated by
729   *         RegionObserver, null otherwise
730   */
731  private Result append(final HRegion region, final OperationQuota quota,
732    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
733    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
734    long before = EnvironmentEdgeManager.currentTime();
735    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
736    checkCellSizeLimit(region, append);
737    spaceQuota.getPolicyEnforcement(region).check(append);
738    quota.addMutation(append);
739    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
740    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
741    Result r = region.append(append, nonceGroup, nonce);
742    if (regionServer.getMetrics() != null) {
743      long blockBytesScanned =
744        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
745      regionServer.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before,
746        blockBytesScanned);
747    }
748    return r == null ? Result.EMPTY_RESULT : r;
749  }
750
751  /**
752   * Execute an increment mutation.
753   */
754  private Result increment(final HRegion region, final OperationQuota quota,
755    final MutationProto mutation, final CellScanner cells, long nonceGroup,
756    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
757    long before = EnvironmentEdgeManager.currentTime();
758    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
759    checkCellSizeLimit(region, increment);
760    spaceQuota.getPolicyEnforcement(region).check(increment);
761    quota.addMutation(increment);
762    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
763    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
764    Result r = region.increment(increment, nonceGroup, nonce);
765    final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
766    if (metricsRegionServer != null) {
767      long blockBytesScanned =
768        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
769      metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before,
770        blockBytesScanned);
771    }
772    return r == null ? Result.EMPTY_RESULT : r;
773  }
774
775  /**
776   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
777   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
778   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
779   *                      returns as a 'result'.
780   * @param closeCallBack the callback to be used with multigets
781   * @param context       the current RpcCallContext
782   * @return Return the <code>cellScanner</code> passed
783   */
784  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
785    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
786    final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
787    final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
788    ActivePolicyEnforcement spaceQuotaEnforcement) {
789    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
790    // one at a time, we instead pass them in batch. Be aware that the corresponding
791    // ResultOrException instance that matches each Put or Delete is then added down in the
792    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
793    // deferred/batched
794    List<ClientProtos.Action> mutations = null;
795    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
796    IOException sizeIOE = null;
797    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
798      ResultOrException.newBuilder();
799    boolean hasResultOrException = false;
800    for (ClientProtos.Action action : actions.getActionList()) {
801      hasResultOrException = false;
802      resultOrExceptionBuilder.clear();
803      try {
804        Result r = null;
805        long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
806        if (
807          context != null && context.isRetryImmediatelySupported()
808            && (context.getResponseCellSize() > maxQuotaResultSize
809              || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize)
810        ) {
811
812          // We're storing the exception since the exception and reason string won't
813          // change after the response size limit is reached.
814          if (sizeIOE == null) {
815            // We don't need the stack un-winding do don't throw the exception.
816            // Throwing will kill the JVM's JIT.
817            //
818            // Instead just create the exception and then store it.
819            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
820              + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore);
821
822            // Only report the exception once since there's only one request that
823            // caused the exception. Otherwise this number will dominate the exceptions count.
824            rpcServer.getMetrics().exception(sizeIOE);
825          }
826
827          // Now that there's an exception is known to be created
828          // use it for the response.
829          //
830          // This will create a copy in the builder.
831          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
832          resultOrExceptionBuilder.setException(pair);
833          context.incrementResponseExceptionSize(pair.getSerializedSize());
834          resultOrExceptionBuilder.setIndex(action.getIndex());
835          builder.addResultOrException(resultOrExceptionBuilder.build());
836          skipCellsForMutation(action, cellScanner);
837          continue;
838        }
839        if (action.hasGet()) {
840          long before = EnvironmentEdgeManager.currentTime();
841          ClientProtos.Get pbGet = action.getGet();
842          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
843          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
844          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
845          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
846          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
847            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
848              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
849              + "reverse Scan.");
850          }
851          try {
852            Get get = ProtobufUtil.toGet(pbGet);
853            if (context != null) {
854              r = get(get, (region), closeCallBack, context);
855            } else {
856              r = region.get(get);
857            }
858          } finally {
859            final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
860            if (metricsRegionServer != null) {
861              long blockBytesScanned =
862                context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
863              metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
864                blockBytesScanned);
865            }
866          }
867        } else if (action.hasServiceCall()) {
868          hasResultOrException = true;
869          com.google.protobuf.Message result = execServiceOnRegion(region, action.getServiceCall());
870          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
871            ClientProtos.CoprocessorServiceResult.newBuilder();
872          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
873            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
874              // TODO: Copy!!!
875              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
876        } else if (action.hasMutation()) {
877          MutationType type = action.getMutation().getMutateType();
878          if (
879            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
880              && !mutations.isEmpty()
881          ) {
882            // Flush out any Puts or Deletes already collected.
883            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
884              spaceQuotaEnforcement);
885            mutations.clear();
886          }
887          switch (type) {
888            case APPEND:
889              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
890                spaceQuotaEnforcement, context);
891              break;
892            case INCREMENT:
893              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
894                spaceQuotaEnforcement, context);
895              break;
896            case PUT:
897            case DELETE:
898              // Collect the individual mutations and apply in a batch
899              if (mutations == null) {
900                mutations = new ArrayList<>(actions.getActionCount());
901              }
902              mutations.add(action);
903              break;
904            default:
905              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
906          }
907        } else {
908          throw new HBaseIOException("Unexpected Action type");
909        }
910        if (r != null) {
911          ClientProtos.Result pbResult = null;
912          if (isClientCellBlockSupport(context)) {
913            pbResult = ProtobufUtil.toResultNoData(r);
914            // Hard to guess the size here. Just make a rough guess.
915            if (cellsToReturn == null) {
916              cellsToReturn = new ArrayList<>();
917            }
918            cellsToReturn.add(r);
919          } else {
920            pbResult = ProtobufUtil.toResult(r);
921          }
922          addSize(context, r);
923          hasResultOrException = true;
924          resultOrExceptionBuilder.setResult(pbResult);
925        }
926        // Could get to here and there was no result and no exception. Presumes we added
927        // a Put or Delete to the collecting Mutations List for adding later. In this
928        // case the corresponding ResultOrException instance for the Put or Delete will be added
929        // down in the doNonAtomicBatchOp method call rather than up here.
930      } catch (IOException ie) {
931        rpcServer.getMetrics().exception(ie);
932        hasResultOrException = true;
933        NameBytesPair pair = ResponseConverter.buildException(ie);
934        resultOrExceptionBuilder.setException(pair);
935        context.incrementResponseExceptionSize(pair.getSerializedSize());
936      }
937      if (hasResultOrException) {
938        // Propagate index.
939        resultOrExceptionBuilder.setIndex(action.getIndex());
940        builder.addResultOrException(resultOrExceptionBuilder.build());
941      }
942    }
943    // Finish up any outstanding mutations
944    if (!CollectionUtils.isEmpty(mutations)) {
945      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
946    }
947    return cellsToReturn;
948  }
949
950  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
951    if (r.maxCellSize > 0) {
952      CellScanner cells = m.cellScanner();
953      while (cells.advance()) {
954        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
955        if (size > r.maxCellSize) {
956          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
957            + r.maxCellSize + " bytes";
958          LOG.debug(msg);
959          throw new DoNotRetryIOException(msg);
960        }
961      }
962    }
963  }
964
965  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
966    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
967    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
968    // Just throw the exception. The exception will be caught and then added to region-level
969    // exception for RegionAction. Leaving the null to action result is ok since the null
970    // result is viewed as failure by hbase client. And the region-lever exception will be used
971    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
972    // AsyncBatchRpcRetryingCaller#onComplete for more details.
973    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
974  }
975
976  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
977    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
978    ActivePolicyEnforcement spaceQuotaEnforcement) {
979    try {
980      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
981        spaceQuotaEnforcement, false);
982    } catch (IOException e) {
983      // Set the exception for each action. The mutations in same RegionAction are group to
984      // different batch and then be processed individually. Hence, we don't set the region-level
985      // exception here for whole RegionAction.
986      for (Action mutation : mutations) {
987        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
988      }
989    }
990  }
991
992  /**
993   * Execute a list of mutations.
994   */
995  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
996    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
997    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
998    throws IOException {
999    Mutation[] mArray = new Mutation[mutations.size()];
1000    long before = EnvironmentEdgeManager.currentTime();
1001    boolean batchContainsPuts = false, batchContainsDelete = false;
1002    try {
1003      /**
1004       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
1005       * since mutation array may have been reoredered.In order to return the right result or
1006       * exception to the corresponding actions, We need to know which action is the mutation belong
1007       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
1008       */
1009      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
1010      int i = 0;
1011      long nonce = HConstants.NO_NONCE;
1012      for (ClientProtos.Action action : mutations) {
1013        if (action.hasGet()) {
1014          throw new DoNotRetryIOException(
1015            "Atomic put and/or delete only, not a Get=" + action.getGet());
1016        }
1017        MutationProto m = action.getMutation();
1018        Mutation mutation;
1019        switch (m.getMutateType()) {
1020          case PUT:
1021            mutation = ProtobufUtil.toPut(m, cells);
1022            batchContainsPuts = true;
1023            break;
1024
1025          case DELETE:
1026            mutation = ProtobufUtil.toDelete(m, cells);
1027            batchContainsDelete = true;
1028            break;
1029
1030          case INCREMENT:
1031            mutation = ProtobufUtil.toIncrement(m, cells);
1032            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
1033            break;
1034
1035          case APPEND:
1036            mutation = ProtobufUtil.toAppend(m, cells);
1037            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
1038            break;
1039
1040          default:
1041            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
1042        }
1043        mutationActionMap.put(mutation, action);
1044        mArray[i++] = mutation;
1045        checkCellSizeLimit(region, mutation);
1046        // Check if a space quota disallows this mutation
1047        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
1048        quota.addMutation(mutation);
1049      }
1050
1051      if (!region.getRegionInfo().isMetaRegion()) {
1052        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
1053      }
1054
1055      // HBASE-17924
1056      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1057      // order is preserved as its expected from the client
1058      if (!atomic) {
1059        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1060      }
1061
1062      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
1063
1064      // When atomic is true, it indicates that the mutateRow API or the batch API with
1065      // RowMutations is called. In this case, we need to merge the results of the
1066      // Increment/Append operations if the mutations include those operations, and set the merged
1067      // result to the first element of the ResultOrException list
1068      if (atomic) {
1069        List<ResultOrException> resultOrExceptions = new ArrayList<>();
1070        List<Result> results = new ArrayList<>();
1071        for (i = 0; i < codes.length; i++) {
1072          if (codes[i].getResult() != null) {
1073            results.add(codes[i].getResult());
1074          }
1075          if (i != 0) {
1076            resultOrExceptions
1077              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
1078          }
1079        }
1080
1081        if (results.isEmpty()) {
1082          builder.addResultOrException(
1083            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
1084        } else {
1085          // Merge the results of the Increment/Append operations
1086          List<Cell> cellList = new ArrayList<>();
1087          for (Result result : results) {
1088            if (result.rawCells() != null) {
1089              cellList.addAll(Arrays.asList(result.rawCells()));
1090            }
1091          }
1092          Result result = Result.create(cellList);
1093
1094          // Set the merged result of the Increment/Append operations to the first element of the
1095          // ResultOrException list
1096          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1097        }
1098
1099        builder.addAllResultOrException(resultOrExceptions);
1100        return;
1101      }
1102
1103      for (i = 0; i < codes.length; i++) {
1104        Mutation currentMutation = mArray[i];
1105        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1106        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1107        Exception e;
1108        switch (codes[i].getOperationStatusCode()) {
1109          case BAD_FAMILY:
1110            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1111            builder.addResultOrException(getResultOrException(e, index));
1112            break;
1113
1114          case SANITY_CHECK_FAILURE:
1115            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1116            builder.addResultOrException(getResultOrException(e, index));
1117            break;
1118
1119          default:
1120            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1121            builder.addResultOrException(getResultOrException(e, index));
1122            break;
1123
1124          case SUCCESS:
1125            ClientProtos.Result result = codes[i].getResult() == null
1126              ? ClientProtos.Result.getDefaultInstance()
1127              : ProtobufUtil.toResult(codes[i].getResult());
1128            builder.addResultOrException(getResultOrException(result, index));
1129            break;
1130
1131          case STORE_TOO_BUSY:
1132            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1133            builder.addResultOrException(getResultOrException(e, index));
1134            break;
1135        }
1136      }
1137    } finally {
1138      int processedMutationIndex = 0;
1139      for (Action mutation : mutations) {
1140        // The non-null mArray[i] means the cell scanner has been read.
1141        if (mArray[processedMutationIndex++] == null) {
1142          skipCellsForMutation(mutation, cells);
1143        }
1144      }
1145      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1146    }
1147  }
1148
1149  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1150    boolean batchContainsDelete) {
1151    final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
1152    if (metricsRegionServer != null) {
1153      long after = EnvironmentEdgeManager.currentTime();
1154      if (batchContainsPuts) {
1155        metricsRegionServer.updatePutBatch(region, after - starttime);
1156      }
1157      if (batchContainsDelete) {
1158        metricsRegionServer.updateDeleteBatch(region, after - starttime);
1159      }
1160    }
1161  }
1162
1163  /**
1164   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1165   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1166   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1167   *         exceptionMessage if any
1168   */
1169  private OperationStatus[] doReplayBatchOp(final HRegion region,
1170    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1171    long before = EnvironmentEdgeManager.currentTime();
1172    boolean batchContainsPuts = false, batchContainsDelete = false;
1173    try {
1174      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1175        MutationReplay m = it.next();
1176
1177        if (m.getType() == MutationType.PUT) {
1178          batchContainsPuts = true;
1179        } else {
1180          batchContainsDelete = true;
1181        }
1182
1183        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1184        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1185        if (metaCells != null && !metaCells.isEmpty()) {
1186          for (Cell metaCell : metaCells) {
1187            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1188            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1189            HRegion hRegion = region;
1190            if (compactionDesc != null) {
1191              // replay the compaction. Remove the files from stores only if we are the primary
1192              // region replica (thus own the files)
1193              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1194                replaySeqId);
1195              continue;
1196            }
1197            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1198            if (flushDesc != null && !isDefaultReplica) {
1199              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1200              continue;
1201            }
1202            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1203            if (regionEvent != null && !isDefaultReplica) {
1204              hRegion.replayWALRegionEventMarker(regionEvent);
1205              continue;
1206            }
1207            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1208            if (bulkLoadEvent != null) {
1209              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1210              continue;
1211            }
1212          }
1213          it.remove();
1214        }
1215      }
1216      requestCount.increment();
1217      if (!region.getRegionInfo().isMetaRegion()) {
1218        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
1219      }
1220      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1221        replaySeqId);
1222    } finally {
1223      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1224    }
1225  }
1226
1227  private void closeAllScanners() {
1228    // Close any outstanding scanners. Means they'll get an UnknownScanner
1229    // exception next time they come in.
1230    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1231      try {
1232        e.getValue().s.close();
1233      } catch (IOException ioe) {
1234        LOG.warn("Closing scanner " + e.getKey(), ioe);
1235      }
1236    }
1237  }
1238
1239  // Directly invoked only for testing
1240  public RSRpcServices(final HRegionServer rs) throws IOException {
1241    final Configuration conf = rs.getConfiguration();
1242    regionServer = rs;
1243    final RpcSchedulerFactory rpcSchedulerFactory;
1244    try {
1245      rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
1246        .getDeclaredConstructor().newInstance();
1247    } catch (NoSuchMethodException | InvocationTargetException | InstantiationException
1248      | IllegalAccessException e) {
1249      throw new IllegalArgumentException(e);
1250    }
1251    // Server to handle client requests.
1252    final InetSocketAddress initialIsa;
1253    final InetSocketAddress bindAddress;
1254    if (this instanceof MasterRpcServices) {
1255      String hostname = DNS.getHostname(conf, DNS.ServerType.MASTER);
1256      int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
1257      // Creation of a HSA will force a resolve.
1258      initialIsa = new InetSocketAddress(hostname, port);
1259      bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port);
1260    } else {
1261      String hostname = DNS.getHostname(conf, DNS.ServerType.REGIONSERVER);
1262      int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
1263      // Creation of a HSA will force a resolve.
1264      initialIsa = new InetSocketAddress(hostname, port);
1265      bindAddress =
1266        new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port);
1267    }
1268    if (initialIsa.getAddress() == null) {
1269      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
1270    }
1271    priority = createPriority();
1272    // Using Address means we don't get the IP too. Shorten it more even to just the host name
1273    // w/o the domain.
1274    final String name = rs.getProcessName() + "/"
1275      + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
1276    // Set how many times to retry talking to another server over Connection.
1277    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
1278    rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
1279    rpcServer.setRsRpcServices(this);
1280    if (!(rs instanceof HMaster)) {
1281      rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1282    }
1283    setReloadableGuardrails(conf);
1284    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1285      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1286    rpcTimeout =
1287      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1288    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1289      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1290
1291    final InetSocketAddress address = rpcServer.getListenerAddress();
1292    if (address == null) {
1293      throw new IOException("Listener channel is closed");
1294    }
1295    // Set our address, however we need the final port that was given to rpcServer
1296    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
1297    rpcServer.setErrorHandler(this);
1298    rs.setName(name);
1299
1300    closedScanners = CacheBuilder.newBuilder()
1301      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1302  }
1303
1304  protected RpcServerInterface createRpcServer(final Server server,
1305    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1306    final String name) throws IOException {
1307    final Configuration conf = server.getConfiguration();
1308    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1309    try {
1310      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1311                                                                                        // bindAddress
1312                                                                                        // for this
1313                                                                                        // server.
1314        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1315    } catch (BindException be) {
1316      throw new IOException(be.getMessage() + ". To switch ports use the '"
1317        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1318        be.getCause() != null ? be.getCause() : be);
1319    }
1320  }
1321
1322  protected Class<?> getRpcSchedulerFactoryClass() {
1323    final Configuration conf = regionServer.getConfiguration();
1324    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1325      SimpleRpcSchedulerFactory.class);
1326  }
1327
1328  @Override
1329  public void onConfigurationChange(Configuration newConf) {
1330    if (rpcServer instanceof ConfigurationObserver) {
1331      ((ConfigurationObserver) rpcServer).onConfigurationChange(newConf);
1332      setReloadableGuardrails(newConf);
1333    }
1334  }
1335
1336  protected PriorityFunction createPriority() {
1337    return new AnnotationReadingPriorityFunction(this);
1338  }
1339
1340  protected void requirePermission(String request, Permission.Action perm) throws IOException {
1341    if (accessChecker != null) {
1342      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm);
1343    }
1344  }
1345
1346  public int getScannersCount() {
1347    return scanners.size();
1348  }
1349
1350  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1351  RegionScanner getScanner(long scannerId) {
1352    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1353    return rsh == null ? null : rsh.s;
1354  }
1355
1356  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1357  private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1358    return scanners.get(toScannerName(scannerId));
1359  }
1360
1361  public String getScanDetailsWithId(long scannerId) {
1362    RegionScanner scanner = getScanner(scannerId);
1363    if (scanner == null) {
1364      return null;
1365    }
1366    StringBuilder builder = new StringBuilder();
1367    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1368    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1369    builder.append(" operation_id: ").append(scanner.getOperationId());
1370    return builder.toString();
1371  }
1372
1373  public String getScanDetailsWithRequest(ScanRequest request) {
1374    try {
1375      if (!request.hasRegion()) {
1376        return null;
1377      }
1378      Region region = getRegion(request.getRegion());
1379      StringBuilder builder = new StringBuilder();
1380      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1381      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1382      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1383        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1384          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1385          break;
1386        }
1387      }
1388      return builder.toString();
1389    } catch (IOException ignored) {
1390      return null;
1391    }
1392  }
1393
1394  /**
1395   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1396   */
1397  long getScannerVirtualTime(long scannerId) {
1398    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1399    return rsh == null ? 0L : rsh.getNextCallSeq();
1400  }
1401
1402  /**
1403   * Method to account for the size of retained cells.
1404   * @param context rpc call context
1405   * @param r       result to add size.
1406   * @return an object that represents the last referenced block from this response.
1407   */
1408  void addSize(RpcCallContext context, Result r) {
1409    if (context != null && r != null && !r.isEmpty()) {
1410      for (Cell c : r.rawCells()) {
1411        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1412      }
1413    }
1414  }
1415
1416  /** Returns Remote client's ip and port else null if can't be determined. */
1417  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1418      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1419  static String getRemoteClientIpAndPort() {
1420    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1421    if (rpcCall == null) {
1422      return HConstants.EMPTY_STRING;
1423    }
1424    InetAddress address = rpcCall.getRemoteAddress();
1425    if (address == null) {
1426      return HConstants.EMPTY_STRING;
1427    }
1428    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1429    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1430    // scanning than a hostname anyways.
1431    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1432  }
1433
1434  /** Returns Remote client's username. */
1435  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1436      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1437  static String getUserName() {
1438    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1439    if (rpcCall == null) {
1440      return HConstants.EMPTY_STRING;
1441    }
1442    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1443  }
1444
1445  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1446    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1447    Lease lease = regionServer.getLeaseManager().createLease(scannerName,
1448      this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName));
1449    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1450    RpcCallback closeCallback =
1451      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1452    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1453      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1454    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1455    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1456      + scannerName + ", " + existing;
1457    return rsh;
1458  }
1459
1460  private boolean isFullRegionScan(Scan scan, HRegion region) {
1461    // If the scan start row equals or less than the start key of the region
1462    // and stop row greater than equals end key (if stop row present)
1463    // or if the stop row is empty
1464    // account this as a full region scan
1465    if (
1466      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1467        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1468          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1469          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1470    ) {
1471      return true;
1472    }
1473    return false;
1474  }
1475
1476  /**
1477   * Find the HRegion based on a region specifier
1478   * @param regionSpecifier the region specifier
1479   * @return the corresponding region
1480   * @throws IOException if the specifier is not null, but failed to find the region
1481   */
1482  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1483    return regionServer.getRegion(regionSpecifier.getValue().toByteArray());
1484  }
1485
1486  /**
1487   * Find the List of HRegions based on a list of region specifiers
1488   * @param regionSpecifiers the list of region specifiers
1489   * @return the corresponding list of regions
1490   * @throws IOException if any of the specifiers is not null, but failed to find the region
1491   */
1492  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1493    final CacheEvictionStatsBuilder stats) {
1494    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1495    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1496      try {
1497        regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray()));
1498      } catch (NotServingRegionException e) {
1499        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1500      }
1501    }
1502    return regions;
1503  }
1504
1505  public PriorityFunction getPriority() {
1506    return priority;
1507  }
1508
1509  public Configuration getConfiguration() {
1510    return regionServer.getConfiguration();
1511  }
1512
1513  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1514    return regionServer.getRegionServerRpcQuotaManager();
1515  }
1516
1517  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1518    return regionServer.getRegionServerSpaceQuotaManager();
1519  }
1520
1521  void start(ZKWatcher zkWatcher) {
1522    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
1523      accessChecker = new AccessChecker(getConfiguration());
1524    } else {
1525      accessChecker = new NoopAccessChecker(getConfiguration());
1526    }
1527    zkPermissionWatcher =
1528      new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
1529    try {
1530      zkPermissionWatcher.start();
1531    } catch (KeeperException e) {
1532      LOG.error("ZooKeeper permission watcher initialization failed", e);
1533    }
1534    this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName);
1535    rpcServer.start();
1536  }
1537
1538  void stop() {
1539    if (zkPermissionWatcher != null) {
1540      zkPermissionWatcher.close();
1541    }
1542    closeAllScanners();
1543    rpcServer.stop();
1544  }
1545
1546  /**
1547   * Called to verify that this server is up and running.
1548   */
1549  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1550  protected void checkOpen() throws IOException {
1551    if (regionServer.isAborted()) {
1552      throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1553    }
1554    if (regionServer.isStopped()) {
1555      throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1556    }
1557    if (!regionServer.isDataFileSystemOk()) {
1558      throw new RegionServerStoppedException("File system not available");
1559    }
1560    if (!regionServer.isOnline()) {
1561      throw new ServerNotRunningYetException(
1562        "Server " + regionServer.serverName + " is not running yet");
1563    }
1564  }
1565
1566  /**
1567   * By default, put up an Admin and a Client Service. Set booleans
1568   * <code>hbase.regionserver.admin.executorService</code> and
1569   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1570   * Default is that both are enabled.
1571   * @return immutable list of blocking services and the security info classes that this server
1572   *         supports
1573   */
1574  protected List<BlockingServiceAndInterface> getServices() {
1575    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1576    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1577    boolean clientMeta =
1578      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1579    boolean bootstrapNodes =
1580      getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true);
1581    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1582    if (client) {
1583      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1584        ClientService.BlockingInterface.class));
1585    }
1586    if (admin) {
1587      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1588        AdminService.BlockingInterface.class));
1589    }
1590    if (clientMeta) {
1591      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1592        ClientMetaService.BlockingInterface.class));
1593    }
1594    if (bootstrapNodes) {
1595      bssi.add(
1596        new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this),
1597          BootstrapNodeService.BlockingInterface.class));
1598    }
1599    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1600  }
1601
1602  public InetSocketAddress getSocketAddress() {
1603    return isa;
1604  }
1605
1606  @Override
1607  public int getPriority(RequestHeader header, Message param, User user) {
1608    return priority.getPriority(header, param, user);
1609  }
1610
1611  @Override
1612  public long getDeadline(RequestHeader header, Message param) {
1613    return priority.getDeadline(header, param);
1614  }
1615
1616  /*
1617   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1618   * @return True if we OOME'd and are aborting.
1619   */
1620  @Override
1621  public boolean checkOOME(final Throwable e) {
1622    return exitIfOOME(e);
1623  }
1624
1625  public static boolean exitIfOOME(final Throwable e) {
1626    boolean stop = false;
1627    try {
1628      if (
1629        e instanceof OutOfMemoryError
1630          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1631          || (e.getMessage() != null && e.getMessage().contains("java.lang.OutOfMemoryError"))
1632      ) {
1633        stop = true;
1634        LOG.error(HBaseMarkers.FATAL, "Run out of memory; " + RSRpcServices.class.getSimpleName()
1635          + " will abort itself immediately", e);
1636      }
1637    } finally {
1638      if (stop) {
1639        Runtime.getRuntime().halt(1);
1640      }
1641    }
1642    return stop;
1643  }
1644
1645  /**
1646   * Close a region on the region server.
1647   * @param controller the RPC controller
1648   * @param request    the request
1649   */
1650  @Override
1651  @QosPriority(priority = HConstants.ADMIN_QOS)
1652  public CloseRegionResponse closeRegion(final RpcController controller,
1653    final CloseRegionRequest request) throws ServiceException {
1654    final ServerName sn = (request.hasDestinationServer()
1655      ? ProtobufUtil.toServerName(request.getDestinationServer())
1656      : null);
1657
1658    try {
1659      checkOpen();
1660      throwOnWrongStartCode(request);
1661      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1662
1663      requestCount.increment();
1664      if (sn == null) {
1665        LOG.info("Close " + encodedRegionName + " without moving");
1666      } else {
1667        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1668      }
1669      boolean closed = regionServer.closeRegion(encodedRegionName, false, sn);
1670      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1671      return builder.build();
1672    } catch (IOException ie) {
1673      throw new ServiceException(ie);
1674    }
1675  }
1676
1677  /**
1678   * Compact a region on the region server.
1679   * @param controller the RPC controller
1680   * @param request    the request
1681   */
1682  @Override
1683  @QosPriority(priority = HConstants.ADMIN_QOS)
1684  public CompactRegionResponse compactRegion(final RpcController controller,
1685    final CompactRegionRequest request) throws ServiceException {
1686    try {
1687      checkOpen();
1688      requestCount.increment();
1689      HRegion region = getRegion(request.getRegion());
1690      // Quota support is enabled, the requesting user is not system/super user
1691      // and a quota policy is enforced that disables compactions.
1692      if (
1693        QuotaUtil.isQuotaEnabled(getConfiguration())
1694          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1695          && this.regionServer.getRegionServerSpaceQuotaManager()
1696            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1697      ) {
1698        throw new DoNotRetryIOException(
1699          "Compactions on this region are " + "disabled due to a space quota violation.");
1700      }
1701      region.startRegionOperation(Operation.COMPACT_REGION);
1702      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1703      boolean major = request.hasMajor() && request.getMajor();
1704      if (request.hasFamily()) {
1705        byte[] family = request.getFamily().toByteArray();
1706        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1707          + region.getRegionInfo().getRegionNameAsString() + " and family "
1708          + Bytes.toString(family);
1709        LOG.trace(log);
1710        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1711          CompactionLifeCycleTracker.DUMMY);
1712      } else {
1713        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1714          + region.getRegionInfo().getRegionNameAsString();
1715        LOG.trace(log);
1716        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1717      }
1718      return CompactRegionResponse.newBuilder().build();
1719    } catch (IOException ie) {
1720      throw new ServiceException(ie);
1721    }
1722  }
1723
1724  @Override
1725  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1726    CompactionSwitchRequest request) throws ServiceException {
1727    rpcPreCheck("compactionSwitch");
1728    final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
1729    requestCount.increment();
1730    boolean prevState = compactSplitThread.isCompactionsEnabled();
1731    CompactionSwitchResponse response =
1732      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1733    if (prevState == request.getEnabled()) {
1734      // passed in requested state is same as current state. No action required
1735      return response;
1736    }
1737    compactSplitThread.switchCompaction(request.getEnabled());
1738    return response;
1739  }
1740
1741  /**
1742   * Flush a region on the region server.
1743   * @param controller the RPC controller
1744   * @param request    the request
1745   */
1746  @Override
1747  @QosPriority(priority = HConstants.ADMIN_QOS)
1748  public FlushRegionResponse flushRegion(final RpcController controller,
1749    final FlushRegionRequest request) throws ServiceException {
1750    try {
1751      checkOpen();
1752      requestCount.increment();
1753      HRegion region = getRegion(request.getRegion());
1754      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1755      boolean shouldFlush = true;
1756      if (request.hasIfOlderThanTs()) {
1757        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1758      }
1759      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1760      if (shouldFlush) {
1761        boolean writeFlushWalMarker =
1762          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1763        // Go behind the curtain so we can manage writing of the flush WAL marker
1764        HRegion.FlushResultImpl flushResult = null;
1765        if (request.hasFamily()) {
1766          List families = new ArrayList();
1767          families.add(request.getFamily().toByteArray());
1768          flushResult =
1769            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1770        } else {
1771          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1772        }
1773        boolean compactionNeeded = flushResult.isCompactionNeeded();
1774        if (compactionNeeded) {
1775          regionServer.getCompactSplitThread().requestSystemCompaction(region,
1776            "Compaction through user triggered flush");
1777        }
1778        builder.setFlushed(flushResult.isFlushSucceeded());
1779        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1780      }
1781      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1782      return builder.build();
1783    } catch (DroppedSnapshotException ex) {
1784      // Cache flush can fail in a few places. If it fails in a critical
1785      // section, we get a DroppedSnapshotException and a replay of wal
1786      // is required. Currently the only way to do this is a restart of
1787      // the server.
1788      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1789      throw new ServiceException(ex);
1790    } catch (IOException ie) {
1791      throw new ServiceException(ie);
1792    }
1793  }
1794
1795  @Override
1796  @QosPriority(priority = HConstants.ADMIN_QOS)
1797  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1798    final GetOnlineRegionRequest request) throws ServiceException {
1799    try {
1800      checkOpen();
1801      requestCount.increment();
1802      Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions();
1803      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1804      for (HRegion region : onlineRegions.values()) {
1805        list.add(region.getRegionInfo());
1806      }
1807      list.sort(RegionInfo.COMPARATOR);
1808      return ResponseConverter.buildGetOnlineRegionResponse(list);
1809    } catch (IOException ie) {
1810      throw new ServiceException(ie);
1811    }
1812  }
1813
1814  // Master implementation of this Admin Service differs given it is not
1815  // able to supply detail only known to RegionServer. See note on
1816  // MasterRpcServers#getRegionInfo.
1817  @Override
1818  @QosPriority(priority = HConstants.ADMIN_QOS)
1819  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1820    final GetRegionInfoRequest request) throws ServiceException {
1821    try {
1822      checkOpen();
1823      requestCount.increment();
1824      HRegion region = getRegion(request.getRegion());
1825      RegionInfo info = region.getRegionInfo();
1826      byte[] bestSplitRow;
1827      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1828        bestSplitRow = region.checkSplit(true).orElse(null);
1829        // when all table data are in memstore, bestSplitRow = null
1830        // try to flush region first
1831        if (bestSplitRow == null) {
1832          region.flush(true);
1833          bestSplitRow = region.checkSplit(true).orElse(null);
1834        }
1835      } else {
1836        bestSplitRow = null;
1837      }
1838      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1839      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1840      if (request.hasCompactionState() && request.getCompactionState()) {
1841        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1842      }
1843      builder.setSplittable(region.isSplittable());
1844      builder.setMergeable(region.isMergeable());
1845      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1846        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1847      }
1848      return builder.build();
1849    } catch (IOException ie) {
1850      throw new ServiceException(ie);
1851    }
1852  }
1853
1854  @Override
1855  @QosPriority(priority = HConstants.ADMIN_QOS)
1856  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1857    throws ServiceException {
1858
1859    List<HRegion> regions;
1860    if (request.hasTableName()) {
1861      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1862      regions = regionServer.getRegions(tableName);
1863    } else {
1864      regions = regionServer.getRegions();
1865    }
1866    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1867    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1868    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1869
1870    try {
1871      for (HRegion region : regions) {
1872        rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1873      }
1874    } catch (IOException e) {
1875      throw new ServiceException(e);
1876    }
1877    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1878    builder.addAllRegionLoads(rLoads);
1879    return builder.build();
1880  }
1881
1882  @Override
1883  @QosPriority(priority = HConstants.ADMIN_QOS)
1884  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1885    ClearCompactionQueuesRequest request) throws ServiceException {
1886    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1887      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1888    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1889    requestCount.increment();
1890    if (clearCompactionQueues.compareAndSet(false, true)) {
1891      final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
1892      try {
1893        checkOpen();
1894        regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
1895        for (String queueName : request.getQueueNameList()) {
1896          LOG.debug("clear " + queueName + " compaction queue");
1897          switch (queueName) {
1898            case "long":
1899              compactSplitThread.clearLongCompactionsQueue();
1900              break;
1901            case "short":
1902              compactSplitThread.clearShortCompactionsQueue();
1903              break;
1904            default:
1905              LOG.warn("Unknown queue name " + queueName);
1906              throw new IOException("Unknown queue name " + queueName);
1907          }
1908        }
1909        regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
1910      } catch (IOException ie) {
1911        throw new ServiceException(ie);
1912      } finally {
1913        clearCompactionQueues.set(false);
1914      }
1915    } else {
1916      LOG.warn("Clear compactions queue is executing by other admin.");
1917    }
1918    return respBuilder.build();
1919  }
1920
1921  /**
1922   * Get some information of the region server.
1923   * @param controller the RPC controller
1924   * @param request    the request
1925   */
1926  @Override
1927  @QosPriority(priority = HConstants.ADMIN_QOS)
1928  public GetServerInfoResponse getServerInfo(final RpcController controller,
1929    final GetServerInfoRequest request) throws ServiceException {
1930    try {
1931      checkOpen();
1932    } catch (IOException ie) {
1933      throw new ServiceException(ie);
1934    }
1935    requestCount.increment();
1936    int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1937    return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1938  }
1939
1940  @Override
1941  @QosPriority(priority = HConstants.ADMIN_QOS)
1942  public GetStoreFileResponse getStoreFile(final RpcController controller,
1943    final GetStoreFileRequest request) throws ServiceException {
1944    try {
1945      checkOpen();
1946      HRegion region = getRegion(request.getRegion());
1947      requestCount.increment();
1948      Set<byte[]> columnFamilies;
1949      if (request.getFamilyCount() == 0) {
1950        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1951      } else {
1952        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1953        for (ByteString cf : request.getFamilyList()) {
1954          columnFamilies.add(cf.toByteArray());
1955        }
1956      }
1957      int nCF = columnFamilies.size();
1958      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1959      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1960      builder.addAllStoreFile(fileList);
1961      return builder.build();
1962    } catch (IOException ie) {
1963      throw new ServiceException(ie);
1964    }
1965  }
1966
1967  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1968    if (!request.hasServerStartCode()) {
1969      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1970      return;
1971    }
1972    throwOnWrongStartCode(request.getServerStartCode());
1973  }
1974
1975  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1976    if (!request.hasServerStartCode()) {
1977      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1978      return;
1979    }
1980    throwOnWrongStartCode(request.getServerStartCode());
1981  }
1982
1983  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1984    // check that we are the same server that this RPC is intended for.
1985    if (regionServer.serverName.getStartcode() != serverStartCode) {
1986      throw new ServiceException(new DoNotRetryIOException(
1987        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1988          + ", this server is: " + regionServer.serverName));
1989    }
1990  }
1991
1992  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1993    if (req.getOpenRegionCount() > 0) {
1994      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1995        throwOnWrongStartCode(openReq);
1996      }
1997    }
1998    if (req.getCloseRegionCount() > 0) {
1999      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
2000        throwOnWrongStartCode(closeReq);
2001      }
2002    }
2003  }
2004
2005  /**
2006   * Open asynchronously a region or a set of regions on the region server. The opening is
2007   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
2008   * As a consequence, this method should be called only from the master.
2009   * <p>
2010   * Different manages states for the region are:
2011   * </p>
2012   * <ul>
2013   * <li>region not opened: the region opening will start asynchronously.</li>
2014   * <li>a close is already in progress: this is considered as an error.</li>
2015   * <li>an open is already in progress: this new open request will be ignored. This is important
2016   * because the Master can do multiple requests if it crashes.</li>
2017   * <li>the region is already opened: this new open request will be ignored.</li>
2018   * </ul>
2019   * <p>
2020   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
2021   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
2022   * errors are put in the response as FAILED_OPENING.
2023   * </p>
2024   * @param controller the RPC controller
2025   * @param request    the request
2026   */
2027  @Override
2028  @QosPriority(priority = HConstants.ADMIN_QOS)
2029  public OpenRegionResponse openRegion(final RpcController controller,
2030    final OpenRegionRequest request) throws ServiceException {
2031    requestCount.increment();
2032    throwOnWrongStartCode(request);
2033
2034    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
2035    final int regionCount = request.getOpenInfoCount();
2036    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
2037    final boolean isBulkAssign = regionCount > 1;
2038    try {
2039      checkOpen();
2040    } catch (IOException ie) {
2041      TableName tableName = null;
2042      if (regionCount == 1) {
2043        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
2044          request.getOpenInfo(0).getRegion();
2045        if (ri != null) {
2046          tableName = ProtobufUtil.toTableName(ri.getTableName());
2047        }
2048      }
2049      if (!TableName.META_TABLE_NAME.equals(tableName)) {
2050        throw new ServiceException(ie);
2051      }
2052      // We are assigning meta, wait a little for regionserver to finish initialization.
2053      // Default to quarter of RPC timeout
2054      int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2055        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
2056      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
2057      synchronized (regionServer.online) {
2058        try {
2059          while (
2060            EnvironmentEdgeManager.currentTime() <= endTime && !regionServer.isStopped()
2061              && !regionServer.isOnline()
2062          ) {
2063            regionServer.online.wait(regionServer.msgInterval);
2064          }
2065          checkOpen();
2066        } catch (InterruptedException t) {
2067          Thread.currentThread().interrupt();
2068          throw new ServiceException(t);
2069        } catch (IOException e) {
2070          throw new ServiceException(e);
2071        }
2072      }
2073    }
2074
2075    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
2076
2077    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
2078      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
2079      TableDescriptor htd;
2080      try {
2081        String encodedName = region.getEncodedName();
2082        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2083        final HRegion onlineRegion = regionServer.getRegion(encodedName);
2084        if (onlineRegion != null) {
2085          // The region is already online. This should not happen any more.
2086          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
2087            + ", which is already online";
2088          LOG.warn(error);
2089          // regionServer.abort(error);
2090          // throw new IOException(error);
2091          builder.addOpeningState(RegionOpeningState.OPENED);
2092          continue;
2093        }
2094        LOG.info("Open " + region.getRegionNameAsString());
2095
2096        final Boolean previous =
2097          regionServer.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
2098
2099        if (Boolean.FALSE.equals(previous)) {
2100          if (regionServer.getRegion(encodedName) != null) {
2101            // There is a close in progress. This should not happen any more.
2102            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
2103              + ", which we are already trying to CLOSE";
2104            regionServer.abort(error);
2105            throw new IOException(error);
2106          }
2107          regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
2108        }
2109
2110        if (Boolean.TRUE.equals(previous)) {
2111          // An open is in progress. This is supported, but let's log this.
2112          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
2113            + ", which we are already trying to OPEN"
2114            + " - ignoring this new request for this region.");
2115        }
2116
2117        // We are opening this region. If it moves back and forth for whatever reason, we don't
2118        // want to keep returning the stale moved record while we are opening/if we close again.
2119        regionServer.removeFromMovedRegions(region.getEncodedName());
2120
2121        if (previous == null || !previous.booleanValue()) {
2122          htd = htds.get(region.getTable());
2123          if (htd == null) {
2124            htd = regionServer.tableDescriptors.get(region.getTable());
2125            htds.put(region.getTable(), htd);
2126          }
2127          if (htd == null) {
2128            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2129          }
2130          // If there is no action in progress, we can submit a specific handler.
2131          // Need to pass the expected version in the constructor.
2132          if (regionServer.executorService == null) {
2133            LOG.info("No executor executorService; skipping open request");
2134          } else {
2135            if (region.isMetaRegion()) {
2136              regionServer.executorService.submit(
2137                new OpenMetaHandler(regionServer, regionServer, region, htd, masterSystemTime));
2138            } else {
2139              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2140                regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
2141                  regionOpenInfo.getFavoredNodesList());
2142              }
2143              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2144                regionServer.executorService.submit(new OpenPriorityRegionHandler(regionServer,
2145                  regionServer, region, htd, masterSystemTime));
2146              } else {
2147                regionServer.executorService.submit(
2148                  new OpenRegionHandler(regionServer, regionServer, region, htd, masterSystemTime));
2149              }
2150            }
2151          }
2152        }
2153
2154        builder.addOpeningState(RegionOpeningState.OPENED);
2155      } catch (IOException ie) {
2156        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2157        if (isBulkAssign) {
2158          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2159        } else {
2160          throw new ServiceException(ie);
2161        }
2162      }
2163    }
2164    return builder.build();
2165  }
2166
2167  /**
2168   * Warmup a region on this server. This method should only be called by Master. It synchronously
2169   * opens the region and closes the region bringing the most important pages in cache.
2170   */
2171  @Override
2172  public WarmupRegionResponse warmupRegion(final RpcController controller,
2173    final WarmupRegionRequest request) throws ServiceException {
2174    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2175    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2176    try {
2177      checkOpen();
2178      String encodedName = region.getEncodedName();
2179      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2180      final HRegion onlineRegion = regionServer.getRegion(encodedName);
2181      if (onlineRegion != null) {
2182        LOG.info("{} is online; skipping warmup", region);
2183        return response;
2184      }
2185      TableDescriptor htd = regionServer.tableDescriptors.get(region.getTable());
2186      if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2187        LOG.info("{} is in transition; skipping warmup", region);
2188        return response;
2189      }
2190      LOG.info("Warmup {}", region.getRegionNameAsString());
2191      HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
2192        regionServer.getConfiguration(), regionServer, null);
2193    } catch (IOException ie) {
2194      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2195      throw new ServiceException(ie);
2196    }
2197
2198    return response;
2199  }
2200
2201  /**
2202   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2203   * that the given mutations will be durable on the receiving RS if this method returns without any
2204   * exception.
2205   * @param controller the RPC controller
2206   * @param request    the request
2207   */
2208  @Override
2209  @QosPriority(priority = HConstants.REPLAY_QOS)
2210  public ReplicateWALEntryResponse replay(final RpcController controller,
2211    final ReplicateWALEntryRequest request) throws ServiceException {
2212    long before = EnvironmentEdgeManager.currentTime();
2213    CellScanner cells = ((HBaseRpcController) controller).cellScanner();
2214    ((HBaseRpcController) controller).setCellScanner(null);
2215    try {
2216      checkOpen();
2217      List<WALEntry> entries = request.getEntryList();
2218      if (entries == null || entries.isEmpty()) {
2219        // empty input
2220        return ReplicateWALEntryResponse.newBuilder().build();
2221      }
2222      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2223      HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
2224      RegionCoprocessorHost coprocessorHost =
2225        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2226          ? region.getCoprocessorHost()
2227          : null; // do not invoke coprocessors if this is a secondary region replica
2228      List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
2229
2230      // Skip adding the edits to WAL if this is a secondary region replica
2231      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2232      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2233
2234      for (WALEntry entry : entries) {
2235        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2236          throw new NotServingRegionException("Replay request contains entries from multiple "
2237            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2238            + entry.getKey().getEncodedRegionName());
2239        }
2240        if (regionServer.nonceManager != null && isPrimary) {
2241          long nonceGroup =
2242            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2243          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2244          regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2245            entry.getKey().getWriteTime());
2246        }
2247        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2248        List<MutationReplay> edits =
2249          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2250        if (coprocessorHost != null) {
2251          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2252          // KeyValue.
2253          if (
2254            coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2255              walEntry.getSecond())
2256          ) {
2257            // if bypass this log entry, ignore it ...
2258            continue;
2259          }
2260          walEntries.add(walEntry);
2261        }
2262        if (edits != null && !edits.isEmpty()) {
2263          // HBASE-17924
2264          // sort to improve lock efficiency
2265          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2266          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2267            ? entry.getKey().getOrigSequenceNumber()
2268            : entry.getKey().getLogSequenceNumber();
2269          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2270          // check if it's a partial success
2271          for (int i = 0; result != null && i < result.length; i++) {
2272            if (result[i] != OperationStatus.SUCCESS) {
2273              throw new IOException(result[i].getExceptionMsg());
2274            }
2275          }
2276        }
2277      }
2278
2279      // sync wal at the end because ASYNC_WAL is used above
2280      WAL wal = region.getWAL();
2281      if (wal != null) {
2282        wal.sync();
2283      }
2284
2285      if (coprocessorHost != null) {
2286        for (Pair<WALKey, WALEdit> entry : walEntries) {
2287          coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2288            entry.getSecond());
2289        }
2290      }
2291      return ReplicateWALEntryResponse.newBuilder().build();
2292    } catch (IOException ie) {
2293      throw new ServiceException(ie);
2294    } finally {
2295      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2296      if (metricsRegionServer != null) {
2297        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2298      }
2299    }
2300  }
2301
2302  /**
2303   * Replicate WAL entries on the region server.
2304   * @param controller the RPC controller
2305   * @param request    the request
2306   */
2307  @Override
2308  @QosPriority(priority = HConstants.REPLICATION_QOS)
2309  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2310    final ReplicateWALEntryRequest request) throws ServiceException {
2311    try {
2312      checkOpen();
2313      if (regionServer.getReplicationSinkService() != null) {
2314        requestCount.increment();
2315        List<WALEntry> entries = request.getEntryList();
2316        CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
2317        ((HBaseRpcController) controller).setCellScanner(null);
2318        regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
2319        regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2320          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2321          request.getSourceHFileArchiveDirPath());
2322        regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
2323        return ReplicateWALEntryResponse.newBuilder().build();
2324      } else {
2325        throw new ServiceException("Replication services are not initialized yet");
2326      }
2327    } catch (IOException ie) {
2328      throw new ServiceException(ie);
2329    }
2330  }
2331
2332  /**
2333   * Roll the WAL writer of the region server.
2334   * @param controller the RPC controller
2335   * @param request    the request
2336   */
2337  @Override
2338  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2339    final RollWALWriterRequest request) throws ServiceException {
2340    try {
2341      checkOpen();
2342      requestCount.increment();
2343      regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2344      regionServer.getWalRoller().requestRollAll();
2345      regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2346      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2347      return builder.build();
2348    } catch (IOException ie) {
2349      throw new ServiceException(ie);
2350    }
2351  }
2352
2353  /**
2354   * Stop the region server.
2355   * @param controller the RPC controller
2356   * @param request    the request
2357   */
2358  @Override
2359  @QosPriority(priority = HConstants.ADMIN_QOS)
2360  public StopServerResponse stopServer(final RpcController controller,
2361    final StopServerRequest request) throws ServiceException {
2362    rpcPreCheck("stopServer");
2363    requestCount.increment();
2364    String reason = request.getReason();
2365    regionServer.stop(reason);
2366    return StopServerResponse.newBuilder().build();
2367  }
2368
2369  @Override
2370  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2371    UpdateFavoredNodesRequest request) throws ServiceException {
2372    rpcPreCheck("updateFavoredNodes");
2373    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2374    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2375    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2376      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2377      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2378        regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2379          regionUpdateInfo.getFavoredNodesList());
2380      }
2381    }
2382    respBuilder.setResponse(openInfoList.size());
2383    return respBuilder.build();
2384  }
2385
2386  /**
2387   * Atomically bulk load several HFiles into an open region
2388   * @return true if successful, false is failed but recoverably (no action)
2389   * @throws ServiceException if failed unrecoverably
2390   */
2391  @Override
2392  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2393    final BulkLoadHFileRequest request) throws ServiceException {
2394    long start = EnvironmentEdgeManager.currentTime();
2395    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2396    if (clusterIds.contains(this.regionServer.clusterId)) {
2397      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2398    } else {
2399      clusterIds.add(this.regionServer.clusterId);
2400    }
2401    try {
2402      checkOpen();
2403      requestCount.increment();
2404      HRegion region = getRegion(request.getRegion());
2405      Map<byte[], List<Path>> map = null;
2406      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2407      long sizeToBeLoaded = -1;
2408
2409      // Check to see if this bulk load would exceed the space quota for this table
2410      if (spaceQuotaEnabled) {
2411        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2412        SpaceViolationPolicyEnforcement enforcement =
2413          activeSpaceQuotas.getPolicyEnforcement(region);
2414        if (enforcement != null) {
2415          // Bulk loads must still be atomic. We must enact all or none.
2416          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2417          for (FamilyPath familyPath : request.getFamilyPathList()) {
2418            filePaths.add(familyPath.getPath());
2419          }
2420          // Check if the batch of files exceeds the current quota
2421          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2422        }
2423      }
2424
2425      List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
2426      for (FamilyPath familyPath : request.getFamilyPathList()) {
2427        familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
2428      }
2429      if (!request.hasBulkToken()) {
2430        if (region.getCoprocessorHost() != null) {
2431          region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
2432        }
2433        try {
2434          map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
2435            request.getCopyFile(), clusterIds, request.getReplicate());
2436        } finally {
2437          if (region.getCoprocessorHost() != null) {
2438            region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
2439          }
2440        }
2441      } else {
2442        // secure bulk load
2443        map =
2444          regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2445      }
2446      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2447      builder.setLoaded(map != null);
2448      if (map != null) {
2449        // Treat any negative size as a flag to "ignore" updating the region size as that is
2450        // not possible to occur in real life (cannot bulk load a file with negative size)
2451        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2452          if (LOG.isTraceEnabled()) {
2453            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2454              + sizeToBeLoaded + " bytes");
2455          }
2456          // Inform space quotas of the new files for this region
2457          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2458            sizeToBeLoaded);
2459        }
2460      }
2461      return builder.build();
2462    } catch (IOException ie) {
2463      throw new ServiceException(ie);
2464    } finally {
2465      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2466      if (metricsRegionServer != null) {
2467        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2468      }
2469    }
2470  }
2471
2472  @Override
2473  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2474    PrepareBulkLoadRequest request) throws ServiceException {
2475    try {
2476      checkOpen();
2477      requestCount.increment();
2478
2479      HRegion region = getRegion(request.getRegion());
2480
2481      String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2482      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2483      builder.setBulkToken(bulkToken);
2484      return builder.build();
2485    } catch (IOException ie) {
2486      throw new ServiceException(ie);
2487    }
2488  }
2489
2490  @Override
2491  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2492    CleanupBulkLoadRequest request) throws ServiceException {
2493    try {
2494      checkOpen();
2495      requestCount.increment();
2496
2497      HRegion region = getRegion(request.getRegion());
2498
2499      regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2500      return CleanupBulkLoadResponse.newBuilder().build();
2501    } catch (IOException ie) {
2502      throw new ServiceException(ie);
2503    }
2504  }
2505
2506  @Override
2507  public CoprocessorServiceResponse execService(final RpcController controller,
2508    final CoprocessorServiceRequest request) throws ServiceException {
2509    try {
2510      checkOpen();
2511      requestCount.increment();
2512      HRegion region = getRegion(request.getRegion());
2513      com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall());
2514      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2515      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2516        region.getRegionInfo().getRegionName()));
2517      // TODO: COPIES!!!!!!
2518      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2519        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2520      return builder.build();
2521    } catch (IOException ie) {
2522      throw new ServiceException(ie);
2523    }
2524  }
2525
2526  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2527    if (filePaths.isEmpty()) {
2528      // local hdfs
2529      return regionServer.getFileSystem();
2530    }
2531    // source hdfs
2532    return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration());
2533  }
2534
2535  private com.google.protobuf.Message execServiceOnRegion(HRegion region,
2536    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2537    // ignore the passed in controller (from the serialized call)
2538    ServerRpcController execController = new ServerRpcController();
2539    return region.execService(execController, serviceCall);
2540  }
2541
2542  /**
2543   * Get data from a table.
2544   * @param controller the RPC controller
2545   * @param request    the get request
2546   */
2547  @Override
2548  public GetResponse get(final RpcController controller, final GetRequest request)
2549    throws ServiceException {
2550    long before = EnvironmentEdgeManager.currentTime();
2551    OperationQuota quota = null;
2552    HRegion region = null;
2553    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2554    try {
2555      checkOpen();
2556      requestCount.increment();
2557      rpcGetRequestCount.increment();
2558      region = getRegion(request.getRegion());
2559
2560      GetResponse.Builder builder = GetResponse.newBuilder();
2561      ClientProtos.Get get = request.getGet();
2562      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2563      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2564      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2565      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2566      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2567        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2568          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2569          + "reverse Scan.");
2570      }
2571      Boolean existence = null;
2572      Result r = null;
2573      quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET);
2574
2575      Get clientGet = ProtobufUtil.toGet(get);
2576      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2577        existence = region.getCoprocessorHost().preExists(clientGet);
2578      }
2579      if (existence == null) {
2580        if (context != null) {
2581          r = get(clientGet, (region), null, context);
2582        } else {
2583          // for test purpose
2584          r = region.get(clientGet);
2585        }
2586        if (get.getExistenceOnly()) {
2587          boolean exists = r.getExists();
2588          if (region.getCoprocessorHost() != null) {
2589            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2590          }
2591          existence = exists;
2592        }
2593      }
2594      if (existence != null) {
2595        ClientProtos.Result pbr =
2596          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2597        builder.setResult(pbr);
2598      } else if (r != null) {
2599        ClientProtos.Result pbr;
2600        if (
2601          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2602            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2603        ) {
2604          pbr = ProtobufUtil.toResultNoData(r);
2605          ((HBaseRpcController) controller)
2606            .setCellScanner(CellUtil.createCellScanner(r.rawCells()));
2607          addSize(context, r);
2608        } else {
2609          pbr = ProtobufUtil.toResult(r);
2610        }
2611        builder.setResult(pbr);
2612      }
2613      // r.cells is null when an table.exists(get) call
2614      if (r != null && r.rawCells() != null) {
2615        quota.addGetResult(r);
2616      }
2617      return builder.build();
2618    } catch (IOException ie) {
2619      throw new ServiceException(ie);
2620    } finally {
2621      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
2622      if (metricsRegionServer != null && region != null) {
2623        long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0;
2624        metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
2625          blockBytesScanned);
2626      }
2627      if (quota != null) {
2628        quota.close();
2629      }
2630    }
2631  }
2632
2633  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2634    RpcCallContext context) throws IOException {
2635    region.prepareGet(get);
2636    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2637
2638    // This method is almost the same as HRegion#get.
2639    List<Cell> results = new ArrayList<>();
2640    long before = EnvironmentEdgeManager.currentTime();
2641    // pre-get CP hook
2642    if (region.getCoprocessorHost() != null) {
2643      if (region.getCoprocessorHost().preGet(get, results)) {
2644        region.metricsUpdateForGet(results, before);
2645        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2646          stale);
2647      }
2648    }
2649    Scan scan = new Scan(get);
2650    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2651      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2652    }
2653    RegionScannerImpl scanner = null;
2654    try {
2655      scanner = region.getScanner(scan);
2656      scanner.next(results);
2657    } finally {
2658      if (scanner != null) {
2659        if (closeCallBack == null) {
2660          // If there is a context then the scanner can be added to the current
2661          // RpcCallContext. The rpc callback will take care of closing the
2662          // scanner, for eg in case
2663          // of get()
2664          context.setCallBack(scanner);
2665        } else {
2666          // The call is from multi() where the results from the get() are
2667          // aggregated and then send out to the
2668          // rpc. The rpccall back will close all such scanners created as part
2669          // of multi().
2670          closeCallBack.addScanner(scanner);
2671        }
2672      }
2673    }
2674
2675    // post-get CP hook
2676    if (region.getCoprocessorHost() != null) {
2677      region.getCoprocessorHost().postGet(get, results);
2678    }
2679    region.metricsUpdateForGet(results, before);
2680
2681    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2682  }
2683
2684  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2685    int sum = 0;
2686    String firstRegionName = null;
2687    for (RegionAction regionAction : request.getRegionActionList()) {
2688      if (sum == 0) {
2689        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2690      }
2691      sum += regionAction.getActionCount();
2692    }
2693    if (sum > rowSizeWarnThreshold) {
2694      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2695        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2696        + RpcServer.getRequestUserName().orElse(null) + "/"
2697        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2698      if (rejectRowsWithSizeOverThreshold) {
2699        throw new ServiceException(
2700          "Rejecting large batch operation for current batch with firstRegionName: "
2701            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2702            + rowSizeWarnThreshold);
2703      }
2704    }
2705  }
2706
2707  private void failRegionAction(MultiResponse.Builder responseBuilder,
2708    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2709    CellScanner cellScanner, Throwable error) {
2710    rpcServer.getMetrics().exception(error);
2711    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2712    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2713    // All Mutations in this RegionAction not executed as we can not see the Region online here
2714    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2715    // corresponding to these Mutations.
2716    if (cellScanner != null) {
2717      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2718    }
2719  }
2720
2721  /**
2722   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2723   * @param rpcc    the RPC controller
2724   * @param request the multi request
2725   */
2726  @Override
2727  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2728    throws ServiceException {
2729    try {
2730      checkOpen();
2731    } catch (IOException ie) {
2732      throw new ServiceException(ie);
2733    }
2734
2735    checkBatchSizeAndLogLargeSize(request);
2736
2737    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2738    // It is also the conduit via which we pass back data.
2739    HBaseRpcController controller = (HBaseRpcController) rpcc;
2740    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2741    if (controller != null) {
2742      controller.setCellScanner(null);
2743    }
2744
2745    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2746
2747    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2748    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2749    this.rpcMultiRequestCount.increment();
2750    this.requestCount.increment();
2751    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2752
2753    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2754    // following logic is for backward compatibility as old clients still use
2755    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2756    if (request.hasCondition()) {
2757      if (request.getRegionActionList().isEmpty()) {
2758        // If the region action list is empty, do nothing.
2759        responseBuilder.setProcessed(true);
2760        return responseBuilder.build();
2761      }
2762
2763      RegionAction regionAction = request.getRegionAction(0);
2764
2765      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2766      // we can assume regionAction.getAtomic() is true here.
2767      assert regionAction.getAtomic();
2768
2769      OperationQuota quota;
2770      HRegion region;
2771      RegionSpecifier regionSpecifier = regionAction.getRegion();
2772
2773      try {
2774        region = getRegion(regionSpecifier);
2775        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2776          regionAction.hasCondition());
2777      } catch (IOException e) {
2778        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2779        return responseBuilder.build();
2780      }
2781
2782      try {
2783        CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2784          cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2785        responseBuilder.setProcessed(result.isSuccess());
2786        ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2787          ClientProtos.ResultOrException.newBuilder();
2788        for (int i = 0; i < regionAction.getActionCount(); i++) {
2789          // To unify the response format with doNonAtomicRegionMutation and read through
2790          // client's AsyncProcess we have to add an empty result instance per operation
2791          resultOrExceptionOrBuilder.clear();
2792          resultOrExceptionOrBuilder.setIndex(i);
2793          regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2794        }
2795      } catch (IOException e) {
2796        rpcServer.getMetrics().exception(e);
2797        // As it's an atomic operation with a condition, we may expect it's a global failure.
2798        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2799      } finally {
2800        quota.close();
2801      }
2802
2803      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2804      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2805      if (regionLoadStats != null) {
2806        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2807          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2808      }
2809      return responseBuilder.build();
2810    }
2811
2812    // this will contain all the cells that we need to return. It's created later, if needed.
2813    List<CellScannable> cellsToReturn = null;
2814    RegionScannersCloseCallBack closeCallBack = null;
2815    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2816    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2817      new HashMap<>(request.getRegionActionCount());
2818
2819    for (RegionAction regionAction : request.getRegionActionList()) {
2820      OperationQuota quota;
2821      HRegion region;
2822      RegionSpecifier regionSpecifier = regionAction.getRegion();
2823      regionActionResultBuilder.clear();
2824
2825      try {
2826        region = getRegion(regionSpecifier);
2827        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2828          regionAction.hasCondition());
2829      } catch (IOException e) {
2830        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2831        continue; // For this region it's a failure.
2832      }
2833
2834      try {
2835        if (regionAction.hasCondition()) {
2836          try {
2837            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2838              ClientProtos.ResultOrException.newBuilder();
2839            if (regionAction.getActionCount() == 1) {
2840              CheckAndMutateResult result =
2841                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2842                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2843              regionActionResultBuilder.setProcessed(result.isSuccess());
2844              resultOrExceptionOrBuilder.setIndex(0);
2845              if (result.getResult() != null) {
2846                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2847              }
2848              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2849            } else {
2850              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2851                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2852              regionActionResultBuilder.setProcessed(result.isSuccess());
2853              for (int i = 0; i < regionAction.getActionCount(); i++) {
2854                if (i == 0 && result.getResult() != null) {
2855                  // Set the result of the Increment/Append operations to the first element of the
2856                  // ResultOrException list
2857                  resultOrExceptionOrBuilder.setIndex(i);
2858                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2859                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2860                  continue;
2861                }
2862                // To unify the response format with doNonAtomicRegionMutation and read through
2863                // client's AsyncProcess we have to add an empty result instance per operation
2864                resultOrExceptionOrBuilder.clear();
2865                resultOrExceptionOrBuilder.setIndex(i);
2866                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2867              }
2868            }
2869          } catch (IOException e) {
2870            rpcServer.getMetrics().exception(e);
2871            // As it's an atomic operation with a condition, we may expect it's a global failure.
2872            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2873          }
2874        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2875          try {
2876            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2877              cellScanner, nonceGroup, spaceQuotaEnforcement);
2878            regionActionResultBuilder.setProcessed(true);
2879            // We no longer use MultiResponse#processed. Instead, we use
2880            // RegionActionResult#processed. This is for backward compatibility for old clients.
2881            responseBuilder.setProcessed(true);
2882          } catch (IOException e) {
2883            rpcServer.getMetrics().exception(e);
2884            // As it's atomic, we may expect it's a global failure.
2885            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2886          }
2887        } else {
2888          // doNonAtomicRegionMutation manages the exception internally
2889          if (context != null && closeCallBack == null) {
2890            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2891            // operation on completion of multiGets.
2892            // Set this only once
2893            closeCallBack = new RegionScannersCloseCallBack();
2894            context.setCallBack(closeCallBack);
2895          }
2896          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2897            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2898            spaceQuotaEnforcement);
2899        }
2900      } finally {
2901        quota.close();
2902      }
2903
2904      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2905      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2906      if (regionLoadStats != null) {
2907        regionStats.put(regionSpecifier, regionLoadStats);
2908      }
2909    }
2910    // Load the controller with the Cells to return.
2911    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2912      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2913    }
2914
2915    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2916    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2917      builder.addRegion(stat.getKey());
2918      builder.addStat(stat.getValue());
2919    }
2920    responseBuilder.setRegionStatistics(builder);
2921    return responseBuilder.build();
2922  }
2923
2924  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2925    if (cellScanner == null) {
2926      return;
2927    }
2928    for (Action action : actions) {
2929      skipCellsForMutation(action, cellScanner);
2930    }
2931  }
2932
2933  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2934    if (cellScanner == null) {
2935      return;
2936    }
2937    try {
2938      if (action.hasMutation()) {
2939        MutationProto m = action.getMutation();
2940        if (m.hasAssociatedCellCount()) {
2941          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2942            cellScanner.advance();
2943          }
2944        }
2945      }
2946    } catch (IOException e) {
2947      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2948      // marked as failed as we could not see the Region here. At client side the top level
2949      // RegionAction exception will be considered first.
2950      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2951    }
2952  }
2953
2954  /**
2955   * Mutate data in a table.
2956   * @param rpcc    the RPC controller
2957   * @param request the mutate request
2958   */
2959  @Override
2960  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2961    throws ServiceException {
2962    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2963    // It is also the conduit via which we pass back data.
2964    HBaseRpcController controller = (HBaseRpcController) rpcc;
2965    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2966    OperationQuota quota = null;
2967    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2968    // Clear scanner so we are not holding on to reference across call.
2969    if (controller != null) {
2970      controller.setCellScanner(null);
2971    }
2972    try {
2973      checkOpen();
2974      requestCount.increment();
2975      rpcMutateRequestCount.increment();
2976      HRegion region = getRegion(request.getRegion());
2977      MutateResponse.Builder builder = MutateResponse.newBuilder();
2978      MutationProto mutation = request.getMutation();
2979      if (!region.getRegionInfo().isMetaRegion()) {
2980        regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
2981      }
2982      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2983      OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
2984      quota = getRpcQuotaManager().checkBatchQuota(region, operationType);
2985      ActivePolicyEnforcement spaceQuotaEnforcement =
2986        getSpaceQuotaManager().getActiveEnforcements();
2987
2988      if (request.hasCondition()) {
2989        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2990          request.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2991        builder.setProcessed(result.isSuccess());
2992        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2993        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2994        if (clientCellBlockSupported) {
2995          addSize(context, result.getResult());
2996        }
2997      } else {
2998        Result r = null;
2999        Boolean processed = null;
3000        MutationType type = mutation.getMutateType();
3001        switch (type) {
3002          case APPEND:
3003            // TODO: this doesn't actually check anything.
3004            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
3005              context);
3006            break;
3007          case INCREMENT:
3008            // TODO: this doesn't actually check anything.
3009            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
3010              context);
3011            break;
3012          case PUT:
3013            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3014            processed = Boolean.TRUE;
3015            break;
3016          case DELETE:
3017            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3018            processed = Boolean.TRUE;
3019            break;
3020          default:
3021            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3022        }
3023        if (processed != null) {
3024          builder.setProcessed(processed);
3025        }
3026        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
3027        addResult(builder, r, controller, clientCellBlockSupported);
3028        if (clientCellBlockSupported) {
3029          addSize(context, r);
3030        }
3031      }
3032      return builder.build();
3033    } catch (IOException ie) {
3034      regionServer.checkFileSystem();
3035      throw new ServiceException(ie);
3036    } finally {
3037      if (quota != null) {
3038        quota.close();
3039      }
3040    }
3041  }
3042
3043  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
3044    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3045    long before = EnvironmentEdgeManager.currentTime();
3046    Put put = ProtobufUtil.toPut(mutation, cellScanner);
3047    checkCellSizeLimit(region, put);
3048    spaceQuota.getPolicyEnforcement(region).check(put);
3049    quota.addMutation(put);
3050    region.put(put);
3051
3052    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3053    if (metricsRegionServer != null) {
3054      long after = EnvironmentEdgeManager.currentTime();
3055      metricsRegionServer.updatePut(region, after - before);
3056    }
3057  }
3058
3059  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3060    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3061    long before = EnvironmentEdgeManager.currentTime();
3062    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3063    checkCellSizeLimit(region, delete);
3064    spaceQuota.getPolicyEnforcement(region).check(delete);
3065    quota.addMutation(delete);
3066    region.delete(delete);
3067
3068    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3069    if (metricsRegionServer != null) {
3070      long after = EnvironmentEdgeManager.currentTime();
3071      metricsRegionServer.updateDelete(region, after - before);
3072    }
3073  }
3074
3075  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3076    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3077    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
3078    long before = EnvironmentEdgeManager.currentTime();
3079    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
3080    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3081    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3082    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3083    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3084    quota.addMutation((Mutation) checkAndMutate.getAction());
3085
3086    CheckAndMutateResult result = null;
3087    if (region.getCoprocessorHost() != null) {
3088      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3089    }
3090    if (result == null) {
3091      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3092      if (region.getCoprocessorHost() != null) {
3093        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3094      }
3095    }
3096    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3097    if (metricsRegionServer != null) {
3098      long after = EnvironmentEdgeManager.currentTime();
3099      long blockBytesScanned =
3100        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
3101      metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned);
3102
3103      MutationType type = mutation.getMutateType();
3104      switch (type) {
3105        case PUT:
3106          metricsRegionServer.updateCheckAndPut(region, after - before);
3107          break;
3108        case DELETE:
3109          metricsRegionServer.updateCheckAndDelete(region, after - before);
3110          break;
3111        default:
3112          break;
3113      }
3114    }
3115    return result;
3116  }
3117
3118  // This is used to keep compatible with the old client implementation. Consider remove it if we
3119  // decide to drop the support of the client that still sends close request to a region scanner
3120  // which has already been exhausted.
3121  @Deprecated
3122  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3123
3124    private static final long serialVersionUID = -4305297078988180130L;
3125
3126    @Override
3127    public synchronized Throwable fillInStackTrace() {
3128      return this;
3129    }
3130  };
3131
3132  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3133    String scannerName = toScannerName(request.getScannerId());
3134    RegionScannerHolder rsh = this.scanners.get(scannerName);
3135    if (rsh == null) {
3136      // just ignore the next or close request if scanner does not exists.
3137      if (closedScanners.getIfPresent(scannerName) != null) {
3138        throw SCANNER_ALREADY_CLOSED;
3139      } else {
3140        LOG.warn("Client tried to access missing scanner " + scannerName);
3141        throw new UnknownScannerException(
3142          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3143            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3144            + "long wait between consecutive client checkins, c) Server may be closing down, "
3145            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3146            + "possible fix would be increasing the value of"
3147            + "'hbase.client.scanner.timeout.period' configuration.");
3148      }
3149    }
3150    RegionInfo hri = rsh.s.getRegionInfo();
3151    // Yes, should be the same instance
3152    if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3153      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3154        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3155      LOG.warn(msg + ", closing...");
3156      scanners.remove(scannerName);
3157      try {
3158        rsh.s.close();
3159      } catch (IOException e) {
3160        LOG.warn("Getting exception closing " + scannerName, e);
3161      } finally {
3162        try {
3163          regionServer.getLeaseManager().cancelLease(scannerName);
3164        } catch (LeaseException e) {
3165          LOG.warn("Getting exception closing " + scannerName, e);
3166        }
3167      }
3168      throw new NotServingRegionException(msg);
3169    }
3170    return rsh;
3171  }
3172
3173  /**
3174   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3175   *         value.
3176   */
3177  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3178    ScanResponse.Builder builder) throws IOException {
3179    HRegion region = getRegion(request.getRegion());
3180    ClientProtos.Scan protoScan = request.getScan();
3181    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3182    Scan scan = ProtobufUtil.toScan(protoScan);
3183    // if the request doesn't set this, get the default region setting.
3184    if (!isLoadingCfsOnDemandSet) {
3185      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3186    }
3187
3188    if (!scan.hasFamilies()) {
3189      // Adding all families to scanner
3190      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3191        scan.addFamily(family);
3192      }
3193    }
3194    if (region.getCoprocessorHost() != null) {
3195      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3196      // wrapper for the core created RegionScanner
3197      region.getCoprocessorHost().preScannerOpen(scan);
3198    }
3199    RegionScannerImpl coreScanner = region.getScanner(scan);
3200    Shipper shipper = coreScanner;
3201    RegionScanner scanner = coreScanner;
3202    try {
3203      if (region.getCoprocessorHost() != null) {
3204        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3205      }
3206    } catch (Exception e) {
3207      // Although region coprocessor is for advanced users and they should take care of the
3208      // implementation to not damage the HBase system, closing the scanner on exception here does
3209      // not have any bad side effect, so let's do it
3210      scanner.close();
3211      throw e;
3212    }
3213    long scannerId = scannerIdGenerator.generateNewScannerId();
3214    builder.setScannerId(scannerId);
3215    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3216    builder.setTtl(scannerLeaseTimeoutPeriod);
3217    String scannerName = toScannerName(scannerId);
3218
3219    boolean fullRegionScan =
3220      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3221
3222    return new Pair<String, RegionScannerHolder>(scannerName,
3223      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3224  }
3225
3226  /**
3227   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3228   * this.scanners, the Map of outstanding scanners and their current state.
3229   * @param scannerId A scanner long id.
3230   * @return The long id as a String.
3231   */
3232  private static String toScannerName(long scannerId) {
3233    return Long.toString(scannerId);
3234  }
3235
3236  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3237    throws OutOfOrderScannerNextException {
3238    // if nextCallSeq does not match throw Exception straight away. This needs to be
3239    // performed even before checking of Lease.
3240    // See HBASE-5974
3241    if (request.hasNextCallSeq()) {
3242      long callSeq = request.getNextCallSeq();
3243      if (!rsh.incNextCallSeq(callSeq)) {
3244        throw new OutOfOrderScannerNextException(
3245          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3246            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3247      }
3248    }
3249  }
3250
3251  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3252    try {
3253      regionServer.getLeaseManager().addLease(lease);
3254    } catch (LeaseStillHeldException e) {
3255      // should not happen as the scanner id is unique.
3256      throw new AssertionError(e);
3257    }
3258  }
3259
3260  // visible for testing only
3261  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3262    boolean allowHeartbeatMessages) {
3263    // Set the time limit to be half of the more restrictive timeout value (one of the
3264    // timeout values must be positive). In the event that both values are positive, the
3265    // more restrictive of the two is used to calculate the limit.
3266    if (allowHeartbeatMessages) {
3267      long now = EnvironmentEdgeManager.currentTime();
3268      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3269      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3270        long timeLimitDelta;
3271        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3272          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3273        } else {
3274          timeLimitDelta =
3275            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3276        }
3277
3278        // Use half of whichever timeout value was more restrictive... But don't allow
3279        // the time limit to be less than the allowable minimum (could cause an
3280        // immediate timeout before scanning any data).
3281        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3282        return now + timeLimitDelta;
3283      }
3284    }
3285    // Default value of timeLimit is negative to indicate no timeLimit should be
3286    // enforced.
3287    return -1L;
3288  }
3289
3290  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3291    long timeout;
3292    if (controller != null && controller.getCallTimeout() > 0) {
3293      timeout = controller.getCallTimeout();
3294    } else if (rpcTimeout > 0) {
3295      timeout = rpcTimeout;
3296    } else {
3297      return -1;
3298    }
3299    if (call != null) {
3300      timeout -= (now - call.getReceiveTime());
3301    }
3302    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3303    // return minimum value here in that case so we count this in calculating the final delta.
3304    return Math.max(minimumScanTimeLimitDelta, timeout);
3305  }
3306
3307  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3308    ScannerContext scannerContext, ScanResponse.Builder builder) {
3309    if (numOfCompleteRows >= limitOfRows) {
3310      if (LOG.isTraceEnabled()) {
3311        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3312          + " scannerContext: " + scannerContext);
3313      }
3314      builder.setMoreResults(false);
3315    }
3316  }
3317
3318  // return whether we have more results in region.
3319  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3320    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3321    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
3322    HRegion region = rsh.r;
3323    RegionScanner scanner = rsh.s;
3324    long maxResultSize;
3325    if (scanner.getMaxResultSize() > 0) {
3326      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3327    } else {
3328      maxResultSize = maxQuotaResultSize;
3329    }
3330    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3331    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3332    // arbitrary 32. TODO: keep record of general size of results being returned.
3333    ArrayList<Cell> values = new ArrayList<>(32);
3334    region.startRegionOperation(Operation.SCAN);
3335    long before = EnvironmentEdgeManager.currentTime();
3336    // Used to check if we've matched the row limit set on the Scan
3337    int numOfCompleteRows = 0;
3338    // Count of times we call nextRaw; can be > numOfCompleteRows.
3339    int numOfNextRawCalls = 0;
3340    try {
3341      int numOfResults = 0;
3342      synchronized (scanner) {
3343        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3344        boolean clientHandlesPartials =
3345          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3346        boolean clientHandlesHeartbeats =
3347          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3348
3349        // On the server side we must ensure that the correct ordering of partial results is
3350        // returned to the client to allow them to properly reconstruct the partial results.
3351        // If the coprocessor host is adding to the result list, we cannot guarantee the
3352        // correct ordering of partial results and so we prevent partial results from being
3353        // formed.
3354        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3355        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3356        boolean moreRows = false;
3357
3358        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3359        // certain time threshold on the server. When the time threshold is exceeded, the
3360        // server stops the scan and sends back whatever Results it has accumulated within
3361        // that time period (may be empty). Since heartbeat messages have the potential to
3362        // create partial Results (in the event that the timeout occurs in the middle of a
3363        // row), we must only generate heartbeat messages when the client can handle both
3364        // heartbeats AND partials
3365        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3366
3367        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3368
3369        final LimitScope sizeScope =
3370          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3371        final LimitScope timeScope =
3372          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3373
3374        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3375
3376        // Configure with limits for this RPC. Set keep progress true since size progress
3377        // towards size limit should be kept between calls to nextRaw
3378        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3379        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3380        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3381        // maxQuotaResultSize - max results just from server side configuration and quotas, without
3382        // user's specified max. We use this for evaluating limits based on blocks (not cells).
3383        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
3384        // cell or block size from maximum here so we adhere to total limits of request.
3385        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
3386        // have accumulated block bytes. If not, this will be 0 for block size.
3387        long maxCellSize = maxResultSize;
3388        long maxBlockSize = maxQuotaResultSize;
3389        if (rpcCall != null) {
3390          maxBlockSize -= rpcCall.getBlockBytesScanned();
3391          maxCellSize -= rpcCall.getResponseCellSize();
3392        }
3393
3394        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
3395        contextBuilder.setBatchLimit(scanner.getBatch());
3396        contextBuilder.setTimeLimit(timeScope, timeLimit);
3397        contextBuilder.setTrackMetrics(trackMetrics);
3398        ScannerContext scannerContext = contextBuilder.build();
3399        boolean limitReached = false;
3400        while (numOfResults < maxResults) {
3401          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3402          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3403          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3404          // reset the batch progress between nextRaw invocations since we don't want the
3405          // batch progress from previous calls to affect future calls
3406          scannerContext.setBatchProgress(0);
3407          assert values.isEmpty();
3408
3409          // Collect values to be returned here
3410          moreRows = scanner.nextRaw(values, scannerContext);
3411          if (rpcCall == null) {
3412            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3413            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3414            // buffers.See more details in HBASE-26036.
3415            CellUtil.cloneIfNecessary(values);
3416          }
3417          numOfNextRawCalls++;
3418
3419          if (!values.isEmpty()) {
3420            if (limitOfRows > 0) {
3421              // First we need to check if the last result is partial and we have a row change. If
3422              // so then we need to increase the numOfCompleteRows.
3423              if (results.isEmpty()) {
3424                if (
3425                  rsh.rowOfLastPartialResult != null
3426                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3427                ) {
3428                  numOfCompleteRows++;
3429                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3430                    builder);
3431                }
3432              } else {
3433                Result lastResult = results.get(results.size() - 1);
3434                if (
3435                  lastResult.mayHaveMoreCellsInRow()
3436                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3437                ) {
3438                  numOfCompleteRows++;
3439                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3440                    builder);
3441                }
3442              }
3443              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3444                break;
3445              }
3446            }
3447            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3448            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3449            results.add(r);
3450            numOfResults++;
3451            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3452              numOfCompleteRows++;
3453              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3454              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3455                break;
3456              }
3457            }
3458          } else if (!moreRows && !results.isEmpty()) {
3459            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3460            // last result is false. Otherwise it's possible that: the first nextRaw returned
3461            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3462            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3463            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3464            // be a contradictory state (HBASE-21206).
3465            int lastIdx = results.size() - 1;
3466            Result r = results.get(lastIdx);
3467            if (r.mayHaveMoreCellsInRow()) {
3468              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
3469            }
3470          }
3471          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3472          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3473          boolean resultsLimitReached = numOfResults >= maxResults;
3474          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3475
3476          if (limitReached || !moreRows) {
3477            // With block size limit, we may exceed size limit without collecting any results.
3478            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3479            // or cursor if results were collected, for example for cell size or heap size limits.
3480            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
3481            // We only want to mark a ScanResponse as a heartbeat message in the event that
3482            // there are more values to be read server side. If there aren't more values,
3483            // marking it as a heartbeat is wasteful because the client will need to issue
3484            // another ScanRequest only to realize that they already have all the values
3485            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3486              // Heartbeat messages occur when the time limit has been reached, or size limit has
3487              // been reached before collecting any results. This can happen for heavily filtered
3488              // scans which scan over too many blocks.
3489              builder.setHeartbeatMessage(true);
3490              if (rsh.needCursor) {
3491                Cell cursorCell = scannerContext.getLastPeekedCell();
3492                if (cursorCell != null) {
3493                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3494                }
3495              }
3496            }
3497            break;
3498          }
3499          values.clear();
3500        }
3501        if (rpcCall != null) {
3502          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3503        }
3504        builder.setMoreResultsInRegion(moreRows);
3505        // Check to see if the client requested that we track metrics server side. If the
3506        // client requested metrics, retrieve the metrics from the scanner context.
3507        if (trackMetrics) {
3508          // rather than increment yet another counter in StoreScanner, just set the value here
3509          // from block size progress before writing into the response
3510          scannerContext.getMetrics().countOfBlockBytesScanned
3511            .set(scannerContext.getBlockSizeProgress());
3512          if (rpcCall != null) {
3513            scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
3514          }
3515          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3516          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3517          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3518
3519          for (Entry<String, Long> entry : metrics.entrySet()) {
3520            pairBuilder.setName(entry.getKey());
3521            pairBuilder.setValue(entry.getValue());
3522            metricBuilder.addMetrics(pairBuilder.build());
3523          }
3524
3525          builder.setScanMetrics(metricBuilder.build());
3526        }
3527      }
3528    } finally {
3529      region.closeRegionOperation();
3530      // Update serverside metrics, even on error.
3531      long end = EnvironmentEdgeManager.currentTime();
3532      long responseCellSize = 0;
3533      long blockBytesScanned = 0;
3534      if (rpcCall != null) {
3535        responseCellSize = rpcCall.getResponseCellSize();
3536        blockBytesScanned = rpcCall.getBlockBytesScanned();
3537        rsh.updateBlockBytesScanned(blockBytesScanned);
3538      }
3539      region.getMetrics().updateScanTime(end - before);
3540      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
3541      if (metricsRegionServer != null) {
3542        metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
3543        metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
3544      }
3545    }
3546    // coprocessor postNext hook
3547    if (region.getCoprocessorHost() != null) {
3548      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3549    }
3550  }
3551
3552  /**
3553   * Scan data in a table.
3554   * @param controller the RPC controller
3555   * @param request    the scan request
3556   */
3557  @Override
3558  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3559    throws ServiceException {
3560    if (controller != null && !(controller instanceof HBaseRpcController)) {
3561      throw new UnsupportedOperationException(
3562        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3563    }
3564    if (!request.hasScannerId() && !request.hasScan()) {
3565      throw new ServiceException(
3566        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3567    }
3568    try {
3569      checkOpen();
3570    } catch (IOException e) {
3571      if (request.hasScannerId()) {
3572        String scannerName = toScannerName(request.getScannerId());
3573        if (LOG.isDebugEnabled()) {
3574          LOG.debug(
3575            "Server shutting down and client tried to access missing scanner " + scannerName);
3576        }
3577        final LeaseManager leaseManager = regionServer.getLeaseManager();
3578        if (leaseManager != null) {
3579          try {
3580            leaseManager.cancelLease(scannerName);
3581          } catch (LeaseException le) {
3582            // No problem, ignore
3583            if (LOG.isTraceEnabled()) {
3584              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3585            }
3586          }
3587        }
3588      }
3589      throw new ServiceException(e);
3590    }
3591    requestCount.increment();
3592    rpcScanRequestCount.increment();
3593    RegionScannerHolder rsh;
3594    ScanResponse.Builder builder = ScanResponse.newBuilder();
3595    String scannerName;
3596    try {
3597      if (request.hasScannerId()) {
3598        // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3599        // for more details.
3600        long scannerId = request.getScannerId();
3601        builder.setScannerId(scannerId);
3602        scannerName = toScannerName(scannerId);
3603        rsh = getRegionScanner(request);
3604      } else {
3605        Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3606        scannerName = scannerNameAndRSH.getFirst();
3607        rsh = scannerNameAndRSH.getSecond();
3608      }
3609    } catch (IOException e) {
3610      if (e == SCANNER_ALREADY_CLOSED) {
3611        // Now we will close scanner automatically if there are no more results for this region but
3612        // the old client will still send a close request to us. Just ignore it and return.
3613        return builder.build();
3614      }
3615      throw new ServiceException(e);
3616    }
3617    if (rsh.fullRegionScan) {
3618      rpcFullScanRequestCount.increment();
3619    }
3620    HRegion region = rsh.r;
3621    LeaseManager.Lease lease;
3622    try {
3623      // Remove lease while its being processed in server; protects against case
3624      // where processing of request takes > lease expiration time. or null if none found.
3625      lease = regionServer.getLeaseManager().removeLease(scannerName);
3626    } catch (LeaseException e) {
3627      throw new ServiceException(e);
3628    }
3629    if (request.hasRenew() && request.getRenew()) {
3630      // add back and return
3631      addScannerLeaseBack(lease);
3632      try {
3633        checkScanNextCallSeq(request, rsh);
3634      } catch (OutOfOrderScannerNextException e) {
3635        throw new ServiceException(e);
3636      }
3637      return builder.build();
3638    }
3639    OperationQuota quota;
3640    try {
3641      quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize,
3642        rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
3643    } catch (IOException e) {
3644      addScannerLeaseBack(lease);
3645      throw new ServiceException(e);
3646    }
3647    try {
3648      checkScanNextCallSeq(request, rsh);
3649    } catch (OutOfOrderScannerNextException e) {
3650      addScannerLeaseBack(lease);
3651      throw new ServiceException(e);
3652    }
3653    // Now we have increased the next call sequence. If we give client an error, the retry will
3654    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3655    // and then client will try to open a new scanner.
3656    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3657    int rows; // this is scan.getCaching
3658    if (request.hasNumberOfRows()) {
3659      rows = request.getNumberOfRows();
3660    } else {
3661      rows = closeScanner ? 0 : 1;
3662    }
3663    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3664    // now let's do the real scan.
3665    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
3666    RegionScanner scanner = rsh.s;
3667    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3668    // close the scanner.
3669    int limitOfRows;
3670    if (request.hasLimitOfRows()) {
3671      limitOfRows = request.getLimitOfRows();
3672    } else {
3673      limitOfRows = -1;
3674    }
3675    boolean scannerClosed = false;
3676    try {
3677      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3678      if (rows > 0) {
3679        boolean done = false;
3680        // Call coprocessor. Get region info from scanner.
3681        if (region.getCoprocessorHost() != null) {
3682          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3683          if (!results.isEmpty()) {
3684            for (Result r : results) {
3685              // add cell size from CP results so we can track response size and update limits
3686              // when calling scan below if !done. We'll also have tracked block size if the CP
3687              // got results from hbase, since StoreScanner tracks that for all calls automatically.
3688              addSize(rpcCall, r);
3689            }
3690          }
3691          if (bypass != null && bypass.booleanValue()) {
3692            done = true;
3693          }
3694        }
3695        if (!done) {
3696          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3697            results, builder, rpcCall);
3698        } else {
3699          builder.setMoreResultsInRegion(!results.isEmpty());
3700        }
3701      } else {
3702        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3703        builder.setMoreResultsInRegion(true);
3704      }
3705
3706      quota.addScanResult(results);
3707      addResults(builder, results, (HBaseRpcController) controller,
3708        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3709        isClientCellBlockSupport(rpcCall));
3710      if (scanner.isFilterDone() && results.isEmpty()) {
3711        // If the scanner's filter - if any - is done with the scan
3712        // only set moreResults to false if the results is empty. This is used to keep compatible
3713        // with the old scan implementation where we just ignore the returned results if moreResults
3714        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3715        builder.setMoreResults(false);
3716      }
3717      // Later we may close the scanner depending on this flag so here we need to make sure that we
3718      // have already set this flag.
3719      assert builder.hasMoreResultsInRegion();
3720      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3721      // yet.
3722      if (!builder.hasMoreResults()) {
3723        builder.setMoreResults(true);
3724      }
3725      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3726        // Record the last cell of the last result if it is a partial result
3727        // We need this to calculate the complete rows we have returned to client as the
3728        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3729        // current row. We may filter out all the remaining cells for the current row and just
3730        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3731        // check for row change.
3732        Result lastResult = results.get(results.size() - 1);
3733        if (lastResult.mayHaveMoreCellsInRow()) {
3734          rsh.rowOfLastPartialResult = lastResult.getRow();
3735        } else {
3736          rsh.rowOfLastPartialResult = null;
3737        }
3738      }
3739      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3740        scannerClosed = true;
3741        closeScanner(region, scanner, scannerName, rpcCall);
3742      }
3743
3744      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3745      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3746        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3747      }
3748
3749      return builder.build();
3750    } catch (IOException e) {
3751      try {
3752        // scanner is closed here
3753        scannerClosed = true;
3754        // The scanner state might be left in a dirty state, so we will tell the Client to
3755        // fail this RPC and close the scanner while opening up another one from the start of
3756        // row that the client has last seen.
3757        closeScanner(region, scanner, scannerName, rpcCall);
3758
3759        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3760        // used in two different semantics.
3761        // (1) The first is to close the client scanner and bubble up the exception all the way
3762        // to the application. This is preferred when the exception is really un-recoverable
3763        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3764        // bucket usually.
3765        // (2) Second semantics is to close the current region scanner only, but continue the
3766        // client scanner by overriding the exception. This is usually UnknownScannerException,
3767        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3768        // application-level ClientScanner has to continue without bubbling up the exception to
3769        // the client. See ClientScanner code to see how it deals with these special exceptions.
3770        if (e instanceof DoNotRetryIOException) {
3771          throw e;
3772        }
3773
3774        // If it is a FileNotFoundException, wrap as a
3775        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3776        if (e instanceof FileNotFoundException) {
3777          throw new DoNotRetryIOException(e);
3778        }
3779
3780        // We closed the scanner already. Instead of throwing the IOException, and client
3781        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3782        // a special exception to save an RPC.
3783        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3784          // 1.4.0+ clients know how to handle
3785          throw new ScannerResetException("Scanner is closed on the server-side", e);
3786        } else {
3787          // older clients do not know about SRE. Just throw USE, which they will handle
3788          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3789            + " scanner state for clients older than 1.3.", e);
3790        }
3791      } catch (IOException ioe) {
3792        throw new ServiceException(ioe);
3793      }
3794    } finally {
3795      if (!scannerClosed) {
3796        // Adding resets expiration time on lease.
3797        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3798        if (rpcCall != null) {
3799          rpcCall.setCallBack(rsh.shippedCallback);
3800        } else {
3801          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3802          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3803          // back to regionserver's LeaseManager in rsh.shippedCallback.
3804          runShippedCallback(rsh);
3805        }
3806      }
3807      quota.close();
3808    }
3809  }
3810
3811  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3812    assert rsh.shippedCallback != null;
3813    try {
3814      rsh.shippedCallback.run();
3815    } catch (IOException ioe) {
3816      throw new ServiceException(ioe);
3817    }
3818  }
3819
3820  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3821    RpcCallContext context) throws IOException {
3822    if (region.getCoprocessorHost() != null) {
3823      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3824        // bypass the actual close.
3825        return;
3826      }
3827    }
3828    RegionScannerHolder rsh = scanners.remove(scannerName);
3829    if (rsh != null) {
3830      if (context != null) {
3831        context.setCallBack(rsh.closeCallBack);
3832      } else {
3833        rsh.s.close();
3834      }
3835      if (region.getCoprocessorHost() != null) {
3836        region.getCoprocessorHost().postScannerClose(scanner);
3837      }
3838      closedScanners.put(scannerName, scannerName);
3839    }
3840  }
3841
3842  @Override
3843  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3844    CoprocessorServiceRequest request) throws ServiceException {
3845    rpcPreCheck("execRegionServerService");
3846    return regionServer.execRegionServerService(controller, request);
3847  }
3848
3849  @Override
3850  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
3851    UpdateConfigurationRequest request) throws ServiceException {
3852    try {
3853      requirePermission("updateConfiguration", Permission.Action.ADMIN);
3854      this.regionServer.updateConfiguration();
3855    } catch (Exception e) {
3856      throw new ServiceException(e);
3857    }
3858    return UpdateConfigurationResponse.getDefaultInstance();
3859  }
3860
3861  @Override
3862  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3863    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3864    try {
3865      final RegionServerSpaceQuotaManager manager = regionServer.getRegionServerSpaceQuotaManager();
3866      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3867        GetSpaceQuotaSnapshotsResponse.newBuilder();
3868      if (manager != null) {
3869        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3870        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3871          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3872            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3873            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3874        }
3875      }
3876      return builder.build();
3877    } catch (Exception e) {
3878      throw new ServiceException(e);
3879    }
3880  }
3881
3882  @Override
3883  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3884    ClearRegionBlockCacheRequest request) throws ServiceException {
3885
3886    try {
3887      rpcPreCheck("clearRegionBlockCache");
3888      ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3889      CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3890      regionServer.getRegionServerCoprocessorHost().preClearRegionBlockCache();
3891      List<HRegion> regions = getRegions(request.getRegionList(), stats);
3892      for (HRegion region : regions) {
3893        try {
3894          stats = stats.append(this.regionServer.clearRegionBlockCache(region));
3895        } catch (Exception e) {
3896          stats.addException(region.getRegionInfo().getRegionName(), e);
3897        }
3898      }
3899      stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3900      regionServer.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build());
3901      return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3902    } catch (IOException e) {
3903      throw new ServiceException(e);
3904    }
3905  }
3906
3907  private void executeOpenRegionProcedures(OpenRegionRequest request,
3908    Map<TableName, TableDescriptor> tdCache) {
3909    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3910    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3911      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3912      TableName tableName = regionInfo.getTable();
3913      TableDescriptor tableDesc = tdCache.get(tableName);
3914      if (tableDesc == null) {
3915        try {
3916          tableDesc = regionServer.getTableDescriptors().get(regionInfo.getTable());
3917        } catch (IOException e) {
3918          // Here we do not fail the whole method since we also need deal with other
3919          // procedures, and we can not ignore this one, so we still schedule a
3920          // AssignRegionHandler and it will report back to master if we still can not get the
3921          // TableDescriptor.
3922          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3923            regionInfo.getTable(), e);
3924        }
3925        if (tableDesc != null) {
3926          tdCache.put(tableName, tableDesc);
3927        }
3928      }
3929      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3930        regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3931          regionOpenInfo.getFavoredNodesList());
3932      }
3933      long procId = regionOpenInfo.getOpenProcId();
3934      if (regionServer.submitRegionProcedure(procId)) {
3935        regionServer.executorService.submit(AssignRegionHandler.create(regionServer, regionInfo,
3936          procId, tableDesc, masterSystemTime));
3937      }
3938    }
3939  }
3940
3941  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3942    String encodedName;
3943    try {
3944      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3945    } catch (DoNotRetryIOException e) {
3946      throw new UncheckedIOException("Should not happen", e);
3947    }
3948    ServerName destination = request.hasDestinationServer()
3949      ? ProtobufUtil.toServerName(request.getDestinationServer())
3950      : null;
3951    long procId = request.getCloseProcId();
3952    boolean evictCache = request.getEvictCache();
3953    if (regionServer.submitRegionProcedure(procId)) {
3954      regionServer.getExecutorService().submit(UnassignRegionHandler.create(regionServer,
3955        encodedName, procId, false, destination, evictCache));
3956    }
3957  }
3958
3959  private void executeProcedures(RemoteProcedureRequest request) {
3960    RSProcedureCallable callable;
3961    try {
3962      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3963        .getDeclaredConstructor().newInstance();
3964    } catch (Exception e) {
3965      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3966        request.getProcId(), e);
3967      regionServer.remoteProcedureComplete(request.getProcId(), e);
3968      return;
3969    }
3970    callable.init(request.getProcData().toByteArray(), regionServer);
3971    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3972    regionServer.executeProcedure(request.getProcId(), callable);
3973  }
3974
3975  @Override
3976  @QosPriority(priority = HConstants.ADMIN_QOS)
3977  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3978    ExecuteProceduresRequest request) throws ServiceException {
3979    try {
3980      checkOpen();
3981      throwOnWrongStartCode(request);
3982      regionServer.getRegionServerCoprocessorHost().preExecuteProcedures();
3983      if (request.getOpenRegionCount() > 0) {
3984        // Avoid reading from the TableDescritor every time(usually it will read from the file
3985        // system)
3986        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3987        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3988      }
3989      if (request.getCloseRegionCount() > 0) {
3990        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3991      }
3992      if (request.getProcCount() > 0) {
3993        request.getProcList().forEach(this::executeProcedures);
3994      }
3995      regionServer.getRegionServerCoprocessorHost().postExecuteProcedures();
3996      return ExecuteProceduresResponse.getDefaultInstance();
3997    } catch (IOException e) {
3998      throw new ServiceException(e);
3999    }
4000  }
4001
4002  @Override
4003  @QosPriority(priority = HConstants.ADMIN_QOS)
4004  public SlowLogResponses getSlowLogResponses(final RpcController controller,
4005    final SlowLogResponseRequest request) {
4006    final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
4007    final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
4008    SlowLogResponses slowLogResponses =
4009      SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
4010    return slowLogResponses;
4011  }
4012
4013  private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
4014    NamedQueueRecorder namedQueueRecorder) {
4015    if (namedQueueRecorder == null) {
4016      return Collections.emptyList();
4017    }
4018    List<SlowLogPayload> slowLogPayloads;
4019    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
4020    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
4021    namedQueueGetRequest.setSlowLogResponseRequest(request);
4022    NamedQueueGetResponse namedQueueGetResponse =
4023      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
4024    slowLogPayloads = namedQueueGetResponse != null
4025      ? namedQueueGetResponse.getSlowLogPayloads()
4026      : Collections.emptyList();
4027    return slowLogPayloads;
4028  }
4029
4030  @Override
4031  @QosPriority(priority = HConstants.ADMIN_QOS)
4032  public SlowLogResponses getLargeLogResponses(final RpcController controller,
4033    final SlowLogResponseRequest request) {
4034    final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
4035    final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
4036    SlowLogResponses slowLogResponses =
4037      SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
4038    return slowLogResponses;
4039  }
4040
4041  @Override
4042  @QosPriority(priority = HConstants.ADMIN_QOS)
4043  public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
4044    final ClearSlowLogResponseRequest request) throws ServiceException {
4045    rpcPreCheck("clearSlowLogsResponses");
4046    final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
4047    boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
4048      .map(
4049        queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
4050      .orElse(false);
4051    ClearSlowLogResponses clearSlowLogResponses =
4052      ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build();
4053    return clearSlowLogResponses;
4054  }
4055
4056  @Override
4057  public HBaseProtos.LogEntry getLogEntries(RpcController controller,
4058    HBaseProtos.LogRequest request) throws ServiceException {
4059    try {
4060      final String logClassName = request.getLogClassName();
4061      Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
4062      Method method = logClass.getMethod("parseFrom", ByteString.class);
4063      if (logClassName.contains("SlowLogResponseRequest")) {
4064        SlowLogResponseRequest slowLogResponseRequest =
4065          (SlowLogResponseRequest) method.invoke(null, request.getLogMessage());
4066        final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
4067        final List<SlowLogPayload> slowLogPayloads =
4068          getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder);
4069        SlowLogResponses slowLogResponses =
4070          SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build();
4071        return HBaseProtos.LogEntry.newBuilder()
4072          .setLogClassName(slowLogResponses.getClass().getName())
4073          .setLogMessage(slowLogResponses.toByteString()).build();
4074      }
4075    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
4076      | InvocationTargetException e) {
4077      LOG.error("Error while retrieving log entries.", e);
4078      throw new ServiceException(e);
4079    }
4080    throw new ServiceException("Invalid request params");
4081  }
4082
4083  @Override
4084  public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
4085    GetCachedFilesListRequest request) throws ServiceException {
4086    GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder();
4087    List<String> fullyCachedFiles = new ArrayList<>();
4088    regionServer.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> {
4089      fullyCachedFiles.addAll(fcf.keySet());
4090    });
4091    return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
4092  }
4093
4094  public RpcScheduler getRpcScheduler() {
4095    return rpcServer.getScheduler();
4096  }
4097
4098  protected AccessChecker getAccessChecker() {
4099    return accessChecker;
4100  }
4101
4102  protected ZKPermissionWatcher getZkPermissionWatcher() {
4103    return zkPermissionWatcher;
4104  }
4105
4106  @Override
4107  public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request)
4108    throws ServiceException {
4109    return GetClusterIdResponse.newBuilder().setClusterId(regionServer.getClusterId()).build();
4110  }
4111
4112  @Override
4113  public GetActiveMasterResponse getActiveMaster(RpcController controller,
4114    GetActiveMasterRequest request) throws ServiceException {
4115    GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
4116    regionServer.getActiveMaster()
4117      .ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name)));
4118    return builder.build();
4119  }
4120
4121  @Override
4122  public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
4123    throws ServiceException {
4124    GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
4125    regionServer.getActiveMaster()
4126      .ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
4127        .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)));
4128    regionServer.getBackupMasters()
4129      .forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder()
4130        .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)));
4131    return builder.build();
4132  }
4133
4134  @Override
4135  public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
4136    GetMetaRegionLocationsRequest request) throws ServiceException {
4137    GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
4138    Optional<List<HRegionLocation>> metaLocations =
4139      regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
4140    metaLocations.ifPresent(hRegionLocations -> hRegionLocations
4141      .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
4142    return builder.build();
4143  }
4144
4145  @Override
4146  public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
4147    GetBootstrapNodesRequest request) throws ServiceException {
4148    int maxNodeCount = regionServer.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
4149      DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
4150    ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
4151    sample.add(regionServer.getBootstrapNodes());
4152
4153    GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
4154    sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)
4155      .forEach(builder::addServerName);
4156    return builder.build();
4157  }
4158
4159  @Override
4160  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
4161    GetAllBootstrapNodesRequest request) throws ServiceException {
4162    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
4163    regionServer.getBootstrapNodes()
4164      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
4165    return builder.build();
4166  }
4167
4168  private void setReloadableGuardrails(Configuration conf) {
4169    rowSizeWarnThreshold =
4170      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
4171    rejectRowsWithSizeOverThreshold =
4172      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
4173    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
4174      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
4175  }
4176}