001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.net.BindException;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.NavigableMap;
036import java.util.Set;
037import java.util.TreeSet;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.LongAdder;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.CacheEvictionStats;
048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellScanner;
051import org.apache.hadoop.hbase.CellUtil;
052import org.apache.hadoop.hbase.DoNotRetryIOException;
053import org.apache.hadoop.hbase.DroppedSnapshotException;
054import org.apache.hadoop.hbase.ExtendedCellScannable;
055import org.apache.hadoop.hbase.ExtendedCellScanner;
056import org.apache.hadoop.hbase.HBaseIOException;
057import org.apache.hadoop.hbase.HBaseRpcServicesBase;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.MultiActionResultTooLarge;
060import org.apache.hadoop.hbase.NotServingRegionException;
061import org.apache.hadoop.hbase.PrivateCellUtil;
062import org.apache.hadoop.hbase.RegionTooBusyException;
063import org.apache.hadoop.hbase.Server;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.UnknownScannerException;
067import org.apache.hadoop.hbase.client.Append;
068import org.apache.hadoop.hbase.client.CheckAndMutate;
069import org.apache.hadoop.hbase.client.CheckAndMutateResult;
070import org.apache.hadoop.hbase.client.ClientInternalHelper;
071import org.apache.hadoop.hbase.client.Delete;
072import org.apache.hadoop.hbase.client.Durability;
073import org.apache.hadoop.hbase.client.Get;
074import org.apache.hadoop.hbase.client.Increment;
075import org.apache.hadoop.hbase.client.Mutation;
076import org.apache.hadoop.hbase.client.OperationWithAttributes;
077import org.apache.hadoop.hbase.client.Put;
078import org.apache.hadoop.hbase.client.QueryMetrics;
079import org.apache.hadoop.hbase.client.RegionInfo;
080import org.apache.hadoop.hbase.client.RegionReplicaUtil;
081import org.apache.hadoop.hbase.client.Result;
082import org.apache.hadoop.hbase.client.Row;
083import org.apache.hadoop.hbase.client.Scan;
084import org.apache.hadoop.hbase.client.TableDescriptor;
085import org.apache.hadoop.hbase.client.VersionInfoUtil;
086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
088import org.apache.hadoop.hbase.exceptions.ScannerResetException;
089import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
090import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
091import org.apache.hadoop.hbase.io.ByteBuffAllocator;
092import org.apache.hadoop.hbase.io.hfile.BlockCache;
093import org.apache.hadoop.hbase.ipc.HBaseRpcController;
094import org.apache.hadoop.hbase.ipc.PriorityFunction;
095import org.apache.hadoop.hbase.ipc.QosPriority;
096import org.apache.hadoop.hbase.ipc.RpcCall;
097import org.apache.hadoop.hbase.ipc.RpcCallContext;
098import org.apache.hadoop.hbase.ipc.RpcCallback;
099import org.apache.hadoop.hbase.ipc.RpcServer;
100import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
101import org.apache.hadoop.hbase.ipc.RpcServerFactory;
102import org.apache.hadoop.hbase.ipc.RpcServerInterface;
103import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
104import org.apache.hadoop.hbase.ipc.ServerRpcController;
105import org.apache.hadoop.hbase.net.Address;
106import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
107import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
108import org.apache.hadoop.hbase.quotas.OperationQuota;
109import org.apache.hadoop.hbase.quotas.QuotaUtil;
110import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
111import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
112import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
113import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
114import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
115import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
116import org.apache.hadoop.hbase.regionserver.Region.Operation;
117import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
118import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
119import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
120import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
121import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
122import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
123import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
124import org.apache.hadoop.hbase.replication.ReplicationUtils;
125import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
126import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
127import org.apache.hadoop.hbase.security.Superusers;
128import org.apache.hadoop.hbase.security.access.Permission;
129import org.apache.hadoop.hbase.util.Bytes;
130import org.apache.hadoop.hbase.util.DNS;
131import org.apache.hadoop.hbase.util.DNS.ServerType;
132import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
133import org.apache.hadoop.hbase.util.Pair;
134import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
135import org.apache.hadoop.hbase.wal.WAL;
136import org.apache.hadoop.hbase.wal.WALEdit;
137import org.apache.hadoop.hbase.wal.WALKey;
138import org.apache.hadoop.hbase.wal.WALSplitUtil;
139import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
140import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
141import org.apache.yetus.audience.InterfaceAudience;
142import org.slf4j.Logger;
143import org.slf4j.LoggerFactory;
144
145import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
146import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
147import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
148import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
149import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
150import org.apache.hbase.thirdparty.com.google.protobuf.Message;
151import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
152import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
153import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
154import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
155import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
156
157import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
158import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
159import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
248
249/**
250 * Implements the regionserver RPC services.
251 */
252@InterfaceAudience.Private
253public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
254  implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
255
256  private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
257
258  /** RPC scheduler to use for the region server. */
259  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
260    "hbase.region.server.rpc.scheduler.factory.class";
261
262  /**
263   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
264   * configuration exists to prevent the scenario where a time limit is specified to be so
265   * restrictive that the time limit is reached immediately (before any cells are scanned).
266   */
267  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
268    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
269  /**
270   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
271   */
272  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
273
274  /**
275   * Whether to reject rows with size > threshold defined by
276   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
277   */
278  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
279    "hbase.rpc.rows.size.threshold.reject";
280
281  /**
282   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
283   */
284  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
285
286  // Request counter. (Includes requests that are not serviced by regions.)
287  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
288  final LongAdder requestCount = new LongAdder();
289
290  // Request counter for rpc get
291  final LongAdder rpcGetRequestCount = new LongAdder();
292
293  // Request counter for rpc scan
294  final LongAdder rpcScanRequestCount = new LongAdder();
295
296  // Request counter for scans that might end up in full scans
297  final LongAdder rpcFullScanRequestCount = new LongAdder();
298
299  // Request counter for rpc multi
300  final LongAdder rpcMultiRequestCount = new LongAdder();
301
302  // Request counter for rpc mutate
303  final LongAdder rpcMutateRequestCount = new LongAdder();
304
305  private volatile long maxScannerResultSize;
306
307  private ScannerIdGenerator scannerIdGenerator;
308  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
309  // Hold the name and last sequence number of a closed scanner for a while. This is used
310  // to keep compatible for old clients which may send next or close request to a region
311  // scanner which has already been exhausted. The entries will be removed automatically
312  // after scannerLeaseTimeoutPeriod.
313  private final Cache<String, Long> closedScanners;
314  /**
315   * The lease timeout period for client scanners (milliseconds).
316   */
317  private final int scannerLeaseTimeoutPeriod;
318
319  /**
320   * The RPC timeout period (milliseconds)
321   */
322  private final int rpcTimeout;
323
324  /**
325   * The minimum allowable delta to use for the scan limit
326   */
327  private final long minimumScanTimeLimitDelta;
328
329  /**
330   * Row size threshold for multi requests above which a warning is logged
331   */
332  private volatile int rowSizeWarnThreshold;
333  /*
334   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
335   * rowSizeWarnThreshold
336   */
337  private volatile boolean rejectRowsWithSizeOverThreshold;
338
339  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
340
341  /**
342   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
343   * to selectively enable/disable these services (Rare is the case where you would ever turn off
344   * one or the other).
345   */
346  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
347    "hbase.regionserver.admin.executorService";
348  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
349    "hbase.regionserver.client.executorService";
350  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
351    "hbase.regionserver.client.meta.executorService";
352  public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG =
353    "hbase.regionserver.bootstrap.nodes.executorService";
354
355  /**
356   * An Rpc callback for closing a RegionScanner.
357   */
358  private static final class RegionScannerCloseCallBack implements RpcCallback {
359
360    private final RegionScanner scanner;
361
362    public RegionScannerCloseCallBack(RegionScanner scanner) {
363      this.scanner = scanner;
364    }
365
366    @Override
367    public void run() throws IOException {
368      this.scanner.close();
369    }
370  }
371
372  /**
373   * An Rpc callback for doing shipped() call on a RegionScanner.
374   */
375  private class RegionScannerShippedCallBack implements RpcCallback {
376    private final String scannerName;
377    private final Shipper shipper;
378    private final Lease lease;
379
380    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
381      this.scannerName = scannerName;
382      this.shipper = shipper;
383      this.lease = lease;
384    }
385
386    @Override
387    public void run() throws IOException {
388      this.shipper.shipped();
389      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
390      // Rpc call and we are at end of the call now. Time to add it back.
391      if (scanners.containsKey(scannerName)) {
392        if (lease != null) {
393          server.getLeaseManager().addLease(lease);
394        }
395      }
396    }
397  }
398
399  /**
400   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
401   * completion of multiGets.
402   */
403  static class RegionScannersCloseCallBack implements RpcCallback {
404    private final List<RegionScanner> scanners = new ArrayList<>();
405
406    public void addScanner(RegionScanner scanner) {
407      this.scanners.add(scanner);
408    }
409
410    @Override
411    public void run() {
412      for (RegionScanner scanner : scanners) {
413        try {
414          scanner.close();
415        } catch (IOException e) {
416          LOG.error("Exception while closing the scanner " + scanner, e);
417        }
418      }
419    }
420  }
421
422  static class RegionScannerContext {
423    final String scannerName;
424    final RegionScannerHolder holder;
425    final OperationQuota quota;
426
427    RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) {
428      this.scannerName = scannerName;
429      this.holder = holder;
430      this.quota = quota;
431    }
432  }
433
434  /**
435   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
436   */
437  static final class RegionScannerHolder {
438    private final AtomicLong nextCallSeq = new AtomicLong(0);
439    private final RegionScanner s;
440    private final HRegion r;
441    private final RpcCallback closeCallBack;
442    private final RpcCallback shippedCallback;
443    private byte[] rowOfLastPartialResult;
444    private boolean needCursor;
445    private boolean fullRegionScan;
446    private final String clientIPAndPort;
447    private final String userName;
448    private volatile long maxBlockBytesScanned = 0;
449    private volatile long prevBlockBytesScanned = 0;
450    private volatile long prevBlockBytesScannedDifference = 0;
451
452    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
453      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
454      String clientIPAndPort, String userName) {
455      this.s = s;
456      this.r = r;
457      this.closeCallBack = closeCallBack;
458      this.shippedCallback = shippedCallback;
459      this.needCursor = needCursor;
460      this.fullRegionScan = fullRegionScan;
461      this.clientIPAndPort = clientIPAndPort;
462      this.userName = userName;
463    }
464
465    long getNextCallSeq() {
466      return nextCallSeq.get();
467    }
468
469    boolean incNextCallSeq(long currentSeq) {
470      // Use CAS to prevent multiple scan request running on the same scanner.
471      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
472    }
473
474    long getMaxBlockBytesScanned() {
475      return maxBlockBytesScanned;
476    }
477
478    long getPrevBlockBytesScannedDifference() {
479      return prevBlockBytesScannedDifference;
480    }
481
482    void updateBlockBytesScanned(long blockBytesScanned) {
483      prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned;
484      prevBlockBytesScanned = blockBytesScanned;
485      if (blockBytesScanned > maxBlockBytesScanned) {
486        maxBlockBytesScanned = blockBytesScanned;
487      }
488    }
489
490    // Should be called only when we need to print lease expired messages otherwise
491    // cache the String once made.
492    @Override
493    public String toString() {
494      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
495        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
496    }
497  }
498
499  /**
500   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
501   */
502  private class ScannerListener implements LeaseListener {
503    private final String scannerName;
504
505    ScannerListener(final String n) {
506      this.scannerName = n;
507    }
508
509    @Override
510    public void leaseExpired() {
511      RegionScannerHolder rsh = scanners.remove(this.scannerName);
512      if (rsh == null) {
513        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
514        return;
515      }
516      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
517      server.getMetrics().incrScannerLeaseExpired();
518      RegionScanner s = rsh.s;
519      HRegion region = null;
520      try {
521        region = server.getRegion(s.getRegionInfo().getRegionName());
522        if (region != null && region.getCoprocessorHost() != null) {
523          region.getCoprocessorHost().preScannerClose(s);
524        }
525      } catch (IOException e) {
526        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
527      } finally {
528        try {
529          s.close();
530          if (region != null && region.getCoprocessorHost() != null) {
531            region.getCoprocessorHost().postScannerClose(s);
532          }
533        } catch (IOException e) {
534          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
535        }
536      }
537    }
538  }
539
540  private static ResultOrException getResultOrException(final ClientProtos.Result r,
541    final int index) {
542    return getResultOrException(ResponseConverter.buildActionResult(r), index);
543  }
544
545  private static ResultOrException getResultOrException(final Exception e, final int index) {
546    return getResultOrException(ResponseConverter.buildActionResult(e), index);
547  }
548
549  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
550    final int index) {
551    return builder.setIndex(index).build();
552  }
553
554  /**
555   * Checks for the following pre-checks in order:
556   * <ol>
557   * <li>RegionServer is running</li>
558   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
559   * </ol>
560   * @param requestName name of rpc request. Used in reporting failures to provide context.
561   * @throws ServiceException If any of the above listed pre-check fails.
562   */
563  private void rpcPreCheck(String requestName) throws ServiceException {
564    try {
565      checkOpen();
566      requirePermission(requestName, Permission.Action.ADMIN);
567    } catch (IOException ioe) {
568      throw new ServiceException(ioe);
569    }
570  }
571
572  private boolean isClientCellBlockSupport(RpcCallContext context) {
573    return context != null && context.isClientCellBlockSupported();
574  }
575
576  private void addResult(final MutateResponse.Builder builder, final Result result,
577    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
578    if (result == null) return;
579    if (clientCellBlockSupported) {
580      builder.setResult(ProtobufUtil.toResultNoData(result));
581      rpcc.setCellScanner(result.cellScanner());
582    } else {
583      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
584      builder.setResult(pbr);
585    }
586  }
587
588  private void addResults(ScanResponse.Builder builder, List<Result> results,
589    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
590    builder.setStale(!isDefaultRegion);
591    if (results.isEmpty()) {
592      return;
593    }
594    if (clientCellBlockSupported) {
595      for (Result res : results) {
596        builder.addCellsPerResult(res.size());
597        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
598      }
599      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results));
600    } else {
601      for (Result res : results) {
602        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
603        builder.addResults(pbr);
604      }
605    }
606  }
607
608  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
609    CellScanner cellScanner, Condition condition, long nonceGroup,
610    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
611    int countOfCompleteMutation = 0;
612    try {
613      if (!region.getRegionInfo().isMetaRegion()) {
614        server.getMemStoreFlusher().reclaimMemStoreMemory();
615      }
616      List<Mutation> mutations = new ArrayList<>();
617      long nonce = HConstants.NO_NONCE;
618      for (ClientProtos.Action action : actions) {
619        if (action.hasGet()) {
620          throw new DoNotRetryIOException(
621            "Atomic put and/or delete only, not a Get=" + action.getGet());
622        }
623        MutationProto mutation = action.getMutation();
624        MutationType type = mutation.getMutateType();
625        switch (type) {
626          case PUT:
627            Put put = ProtobufUtil.toPut(mutation, cellScanner);
628            ++countOfCompleteMutation;
629            checkCellSizeLimit(region, put);
630            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
631            mutations.add(put);
632            break;
633          case DELETE:
634            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
635            ++countOfCompleteMutation;
636            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
637            mutations.add(del);
638            break;
639          case INCREMENT:
640            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
641            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
642            ++countOfCompleteMutation;
643            checkCellSizeLimit(region, increment);
644            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
645            mutations.add(increment);
646            break;
647          case APPEND:
648            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
649            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
650            ++countOfCompleteMutation;
651            checkCellSizeLimit(region, append);
652            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
653            mutations.add(append);
654            break;
655          default:
656            throw new DoNotRetryIOException("invalid mutation type : " + type);
657        }
658      }
659
660      if (mutations.size() == 0) {
661        return new CheckAndMutateResult(true, null);
662      } else {
663        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
664        CheckAndMutateResult result = null;
665        if (region.getCoprocessorHost() != null) {
666          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
667        }
668        if (result == null) {
669          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
670          if (region.getCoprocessorHost() != null) {
671            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
672          }
673        }
674        return result;
675      }
676    } finally {
677      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
678      // even if the malformed cells are not skipped.
679      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
680        skipCellsForMutation(actions.get(i), cellScanner);
681      }
682    }
683  }
684
685  /**
686   * Execute an append mutation.
687   * @return result to return to client if default operation should be bypassed as indicated by
688   *         RegionObserver, null otherwise
689   */
690  private Result append(final HRegion region, final OperationQuota quota,
691    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
692    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
693    long before = EnvironmentEdgeManager.currentTime();
694    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
695    checkCellSizeLimit(region, append);
696    spaceQuota.getPolicyEnforcement(region).check(append);
697    quota.addMutation(append);
698    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
699    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
700    Result r = region.append(append, nonceGroup, nonce);
701    if (server.getMetrics() != null) {
702      long blockBytesScanned =
703        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
704      server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before,
705        blockBytesScanned);
706    }
707    return r == null ? Result.EMPTY_RESULT : r;
708  }
709
710  /**
711   * Execute an increment mutation.
712   */
713  private Result increment(final HRegion region, final OperationQuota quota,
714    final MutationProto mutation, final CellScanner cells, long nonceGroup,
715    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
716    long before = EnvironmentEdgeManager.currentTime();
717    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
718    checkCellSizeLimit(region, increment);
719    spaceQuota.getPolicyEnforcement(region).check(increment);
720    quota.addMutation(increment);
721    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
722    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
723    Result r = region.increment(increment, nonceGroup, nonce);
724    final MetricsRegionServer metricsRegionServer = server.getMetrics();
725    if (metricsRegionServer != null) {
726      long blockBytesScanned =
727        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
728      metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before,
729        blockBytesScanned);
730    }
731    return r == null ? Result.EMPTY_RESULT : r;
732  }
733
734  /**
735   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
736   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
737   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
738   *                      returns as a 'result'.
739   * @param closeCallBack the callback to be used with multigets
740   * @param context       the current RpcCallContext
741   * @return Return the <code>cellScanner</code> passed
742   */
743  private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region,
744    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
745    final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn,
746    long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
747    ActivePolicyEnforcement spaceQuotaEnforcement) {
748    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
749    // one at a time, we instead pass them in batch. Be aware that the corresponding
750    // ResultOrException instance that matches each Put or Delete is then added down in the
751    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
752    // deferred/batched
753    List<ClientProtos.Action> mutations = null;
754    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
755    IOException sizeIOE = null;
756    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
757      ResultOrException.newBuilder();
758    boolean hasResultOrException = false;
759    for (ClientProtos.Action action : actions.getActionList()) {
760      hasResultOrException = false;
761      resultOrExceptionBuilder.clear();
762      try {
763        Result r = null;
764        long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
765        if (
766          context != null && context.isRetryImmediatelySupported()
767            && (context.getResponseCellSize() > maxQuotaResultSize
768              || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize)
769        ) {
770
771          // We're storing the exception since the exception and reason string won't
772          // change after the response size limit is reached.
773          if (sizeIOE == null) {
774            // We don't need the stack un-winding do don't throw the exception.
775            // Throwing will kill the JVM's JIT.
776            //
777            // Instead just create the exception and then store it.
778            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
779              + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore);
780
781            // Only report the exception once since there's only one request that
782            // caused the exception. Otherwise this number will dominate the exceptions count.
783            rpcServer.getMetrics().exception(sizeIOE);
784          }
785
786          // Now that there's an exception is known to be created
787          // use it for the response.
788          //
789          // This will create a copy in the builder.
790          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
791          resultOrExceptionBuilder.setException(pair);
792          context.incrementResponseExceptionSize(pair.getSerializedSize());
793          resultOrExceptionBuilder.setIndex(action.getIndex());
794          builder.addResultOrException(resultOrExceptionBuilder.build());
795          skipCellsForMutation(action, cellScanner);
796          continue;
797        }
798        if (action.hasGet()) {
799          long before = EnvironmentEdgeManager.currentTime();
800          ClientProtos.Get pbGet = action.getGet();
801          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
802          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
803          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
804          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
805          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
806            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
807              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
808              + "reverse Scan.");
809          }
810          try {
811            Get get = ProtobufUtil.toGet(pbGet);
812            if (context != null) {
813              r = get(get, (region), closeCallBack, context);
814            } else {
815              r = region.get(get);
816            }
817          } finally {
818            final MetricsRegionServer metricsRegionServer = server.getMetrics();
819            if (metricsRegionServer != null) {
820              long blockBytesScanned =
821                context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
822              metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
823                blockBytesScanned);
824            }
825          }
826        } else if (action.hasServiceCall()) {
827          hasResultOrException = true;
828          Message result = execServiceOnRegion(region, action.getServiceCall());
829          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
830            ClientProtos.CoprocessorServiceResult.newBuilder();
831          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
832            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
833              // TODO: Copy!!!
834              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
835        } else if (action.hasMutation()) {
836          MutationType type = action.getMutation().getMutateType();
837          if (
838            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
839              && !mutations.isEmpty()
840          ) {
841            // Flush out any Puts or Deletes already collected.
842            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
843              spaceQuotaEnforcement);
844            mutations.clear();
845          }
846          switch (type) {
847            case APPEND:
848              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
849                spaceQuotaEnforcement, context);
850              break;
851            case INCREMENT:
852              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
853                spaceQuotaEnforcement, context);
854              break;
855            case PUT:
856            case DELETE:
857              // Collect the individual mutations and apply in a batch
858              if (mutations == null) {
859                mutations = new ArrayList<>(actions.getActionCount());
860              }
861              mutations.add(action);
862              break;
863            default:
864              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
865          }
866        } else {
867          throw new HBaseIOException("Unexpected Action type");
868        }
869        if (r != null) {
870          ClientProtos.Result pbResult = null;
871          if (isClientCellBlockSupport(context)) {
872            pbResult = ProtobufUtil.toResultNoData(r);
873            // Hard to guess the size here. Just make a rough guess.
874            if (cellsToReturn == null) {
875              cellsToReturn = new ArrayList<>();
876            }
877            cellsToReturn.add(r);
878          } else {
879            pbResult = ProtobufUtil.toResult(r);
880          }
881          addSize(context, r);
882          hasResultOrException = true;
883          resultOrExceptionBuilder.setResult(pbResult);
884        }
885        // Could get to here and there was no result and no exception. Presumes we added
886        // a Put or Delete to the collecting Mutations List for adding later. In this
887        // case the corresponding ResultOrException instance for the Put or Delete will be added
888        // down in the doNonAtomicBatchOp method call rather than up here.
889      } catch (IOException ie) {
890        rpcServer.getMetrics().exception(ie);
891        hasResultOrException = true;
892        NameBytesPair pair = ResponseConverter.buildException(ie);
893        resultOrExceptionBuilder.setException(pair);
894        context.incrementResponseExceptionSize(pair.getSerializedSize());
895      }
896      if (hasResultOrException) {
897        // Propagate index.
898        resultOrExceptionBuilder.setIndex(action.getIndex());
899        builder.addResultOrException(resultOrExceptionBuilder.build());
900      }
901    }
902    // Finish up any outstanding mutations
903    if (!CollectionUtils.isEmpty(mutations)) {
904      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
905    }
906    return cellsToReturn;
907  }
908
909  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
910    if (r.maxCellSize > 0) {
911      CellScanner cells = m.cellScanner();
912      while (cells.advance()) {
913        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
914        if (size > r.maxCellSize) {
915          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
916            + r.maxCellSize + " bytes";
917          LOG.debug(msg);
918          throw new DoNotRetryIOException(msg);
919        }
920      }
921    }
922  }
923
924  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
925    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
926    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
927    // Just throw the exception. The exception will be caught and then added to region-level
928    // exception for RegionAction. Leaving the null to action result is ok since the null
929    // result is viewed as failure by hbase client. And the region-lever exception will be used
930    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
931    // AsyncBatchRpcRetryingCaller#onComplete for more details.
932    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
933  }
934
935  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
936    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
937    ActivePolicyEnforcement spaceQuotaEnforcement) {
938    try {
939      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
940        spaceQuotaEnforcement, false);
941    } catch (IOException e) {
942      // Set the exception for each action. The mutations in same RegionAction are group to
943      // different batch and then be processed individually. Hence, we don't set the region-level
944      // exception here for whole RegionAction.
945      for (Action mutation : mutations) {
946        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
947      }
948    }
949  }
950
951  /**
952   * Execute a list of mutations.
953   */
954  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
955    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
956    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
957    throws IOException {
958    Mutation[] mArray = new Mutation[mutations.size()];
959    long before = EnvironmentEdgeManager.currentTime();
960    boolean batchContainsPuts = false, batchContainsDelete = false;
961    try {
962      /**
963       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
964       * since mutation array may have been reoredered.In order to return the right result or
965       * exception to the corresponding actions, We need to know which action is the mutation belong
966       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
967       */
968      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
969      int i = 0;
970      long nonce = HConstants.NO_NONCE;
971      for (ClientProtos.Action action : mutations) {
972        if (action.hasGet()) {
973          throw new DoNotRetryIOException(
974            "Atomic put and/or delete only, not a Get=" + action.getGet());
975        }
976        MutationProto m = action.getMutation();
977        Mutation mutation;
978        switch (m.getMutateType()) {
979          case PUT:
980            mutation = ProtobufUtil.toPut(m, cells);
981            batchContainsPuts = true;
982            break;
983
984          case DELETE:
985            mutation = ProtobufUtil.toDelete(m, cells);
986            batchContainsDelete = true;
987            break;
988
989          case INCREMENT:
990            mutation = ProtobufUtil.toIncrement(m, cells);
991            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
992            break;
993
994          case APPEND:
995            mutation = ProtobufUtil.toAppend(m, cells);
996            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
997            break;
998
999          default:
1000            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
1001        }
1002        mutationActionMap.put(mutation, action);
1003        mArray[i++] = mutation;
1004        checkCellSizeLimit(region, mutation);
1005        // Check if a space quota disallows this mutation
1006        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
1007        quota.addMutation(mutation);
1008      }
1009
1010      if (!region.getRegionInfo().isMetaRegion()) {
1011        server.getMemStoreFlusher().reclaimMemStoreMemory();
1012      }
1013
1014      // HBASE-17924
1015      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1016      // order is preserved as its expected from the client
1017      if (!atomic) {
1018        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1019      }
1020
1021      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
1022
1023      // When atomic is true, it indicates that the mutateRow API or the batch API with
1024      // RowMutations is called. In this case, we need to merge the results of the
1025      // Increment/Append operations if the mutations include those operations, and set the merged
1026      // result to the first element of the ResultOrException list
1027      if (atomic) {
1028        List<ResultOrException> resultOrExceptions = new ArrayList<>();
1029        List<Result> results = new ArrayList<>();
1030        for (i = 0; i < codes.length; i++) {
1031          if (codes[i].getResult() != null) {
1032            results.add(codes[i].getResult());
1033          }
1034          if (i != 0) {
1035            resultOrExceptions
1036              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
1037          }
1038        }
1039
1040        if (results.isEmpty()) {
1041          builder.addResultOrException(
1042            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
1043        } else {
1044          // Merge the results of the Increment/Append operations
1045          List<Cell> cellList = new ArrayList<>();
1046          for (Result result : results) {
1047            if (result.rawCells() != null) {
1048              cellList.addAll(Arrays.asList(result.rawCells()));
1049            }
1050          }
1051          Result result = Result.create(cellList);
1052
1053          // Set the merged result of the Increment/Append operations to the first element of the
1054          // ResultOrException list
1055          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1056        }
1057
1058        builder.addAllResultOrException(resultOrExceptions);
1059        return;
1060      }
1061
1062      for (i = 0; i < codes.length; i++) {
1063        Mutation currentMutation = mArray[i];
1064        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1065        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1066        Exception e;
1067        switch (codes[i].getOperationStatusCode()) {
1068          case BAD_FAMILY:
1069            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1070            builder.addResultOrException(getResultOrException(e, index));
1071            break;
1072
1073          case SANITY_CHECK_FAILURE:
1074            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1075            builder.addResultOrException(getResultOrException(e, index));
1076            break;
1077
1078          default:
1079            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1080            builder.addResultOrException(getResultOrException(e, index));
1081            break;
1082
1083          case SUCCESS:
1084            ClientProtos.Result result = codes[i].getResult() == null
1085              ? ClientProtos.Result.getDefaultInstance()
1086              : ProtobufUtil.toResult(codes[i].getResult());
1087            builder.addResultOrException(getResultOrException(result, index));
1088            break;
1089
1090          case STORE_TOO_BUSY:
1091            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1092            builder.addResultOrException(getResultOrException(e, index));
1093            break;
1094        }
1095      }
1096    } finally {
1097      int processedMutationIndex = 0;
1098      for (Action mutation : mutations) {
1099        // The non-null mArray[i] means the cell scanner has been read.
1100        if (mArray[processedMutationIndex++] == null) {
1101          skipCellsForMutation(mutation, cells);
1102        }
1103      }
1104      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1105    }
1106  }
1107
1108  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1109    boolean batchContainsDelete) {
1110    final MetricsRegionServer metricsRegionServer = server.getMetrics();
1111    if (metricsRegionServer != null) {
1112      long after = EnvironmentEdgeManager.currentTime();
1113      if (batchContainsPuts) {
1114        metricsRegionServer.updatePutBatch(region, after - starttime);
1115      }
1116      if (batchContainsDelete) {
1117        metricsRegionServer.updateDeleteBatch(region, after - starttime);
1118      }
1119    }
1120  }
1121
1122  /**
1123   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1124   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1125   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1126   *         exceptionMessage if any
1127   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
1128   *             edits for secondary replicas any more, see
1129   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
1130   */
1131  @Deprecated
1132  private OperationStatus[] doReplayBatchOp(final HRegion region,
1133    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1134    long before = EnvironmentEdgeManager.currentTime();
1135    boolean batchContainsPuts = false, batchContainsDelete = false;
1136    try {
1137      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1138        MutationReplay m = it.next();
1139
1140        if (m.getType() == MutationType.PUT) {
1141          batchContainsPuts = true;
1142        } else {
1143          batchContainsDelete = true;
1144        }
1145
1146        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1147        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1148        if (metaCells != null && !metaCells.isEmpty()) {
1149          for (Cell metaCell : metaCells) {
1150            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1151            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1152            HRegion hRegion = region;
1153            if (compactionDesc != null) {
1154              // replay the compaction. Remove the files from stores only if we are the primary
1155              // region replica (thus own the files)
1156              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1157                replaySeqId);
1158              continue;
1159            }
1160            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1161            if (flushDesc != null && !isDefaultReplica) {
1162              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1163              continue;
1164            }
1165            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1166            if (regionEvent != null && !isDefaultReplica) {
1167              hRegion.replayWALRegionEventMarker(regionEvent);
1168              continue;
1169            }
1170            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1171            if (bulkLoadEvent != null) {
1172              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1173              continue;
1174            }
1175          }
1176          it.remove();
1177        }
1178      }
1179      requestCount.increment();
1180      if (!region.getRegionInfo().isMetaRegion()) {
1181        server.getMemStoreFlusher().reclaimMemStoreMemory();
1182      }
1183      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1184        replaySeqId);
1185    } finally {
1186      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1187    }
1188  }
1189
1190  private void closeAllScanners() {
1191    // Close any outstanding scanners. Means they'll get an UnknownScanner
1192    // exception next time they come in.
1193    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1194      try {
1195        e.getValue().s.close();
1196      } catch (IOException ioe) {
1197        LOG.warn("Closing scanner " + e.getKey(), ioe);
1198      }
1199    }
1200  }
1201
1202  // Directly invoked only for testing
1203  public RSRpcServices(final HRegionServer rs) throws IOException {
1204    super(rs, rs.getProcessName());
1205    final Configuration conf = rs.getConfiguration();
1206    setReloadableGuardrails(conf);
1207    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1208      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1209    rpcTimeout =
1210      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1211    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1212      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1213    rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1214    closedScanners = CacheBuilder.newBuilder()
1215      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1216  }
1217
1218  @Override
1219  protected boolean defaultReservoirEnabled() {
1220    return true;
1221  }
1222
1223  @Override
1224  protected ServerType getDNSServerType() {
1225    return DNS.ServerType.REGIONSERVER;
1226  }
1227
1228  @Override
1229  protected String getHostname(Configuration conf, String defaultHostname) {
1230    return conf.get("hbase.regionserver.ipc.address", defaultHostname);
1231  }
1232
1233  @Override
1234  protected String getPortConfigName() {
1235    return HConstants.REGIONSERVER_PORT;
1236  }
1237
1238  @Override
1239  protected int getDefaultPort() {
1240    return HConstants.DEFAULT_REGIONSERVER_PORT;
1241  }
1242
1243  @Override
1244  protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
1245    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1246      SimpleRpcSchedulerFactory.class);
1247  }
1248
1249  protected RpcServerInterface createRpcServer(final Server server,
1250    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1251    final String name) throws IOException {
1252    final Configuration conf = server.getConfiguration();
1253    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1254    try {
1255      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1256                                                                                        // bindAddress
1257                                                                                        // for this
1258                                                                                        // server.
1259        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1260    } catch (BindException be) {
1261      throw new IOException(be.getMessage() + ". To switch ports use the '"
1262        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1263        be.getCause() != null ? be.getCause() : be);
1264    }
1265  }
1266
1267  protected Class<?> getRpcSchedulerFactoryClass() {
1268    final Configuration conf = server.getConfiguration();
1269    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1270      SimpleRpcSchedulerFactory.class);
1271  }
1272
1273  protected PriorityFunction createPriority() {
1274    return new RSAnnotationReadingPriorityFunction(this);
1275  }
1276
1277  public int getScannersCount() {
1278    return scanners.size();
1279  }
1280
1281  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1282  RegionScanner getScanner(long scannerId) {
1283    RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
1284    return rsh == null ? null : rsh.s;
1285  }
1286
1287  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1288  private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) {
1289    return scanners.get(toScannerName(scannerId));
1290  }
1291
1292  public String getScanDetailsWithId(long scannerId) {
1293    RegionScanner scanner = getScanner(scannerId);
1294    if (scanner == null) {
1295      return null;
1296    }
1297    StringBuilder builder = new StringBuilder();
1298    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1299    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1300    builder.append(" operation_id: ").append(scanner.getOperationId());
1301    return builder.toString();
1302  }
1303
1304  public String getScanDetailsWithRequest(ScanRequest request) {
1305    try {
1306      if (!request.hasRegion()) {
1307        return null;
1308      }
1309      Region region = getRegion(request.getRegion());
1310      StringBuilder builder = new StringBuilder();
1311      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1312      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1313      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1314        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1315          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1316          break;
1317        }
1318      }
1319      return builder.toString();
1320    } catch (IOException ignored) {
1321      return null;
1322    }
1323  }
1324
1325  /**
1326   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1327   */
1328  long getScannerVirtualTime(long scannerId) {
1329    RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
1330    return rsh == null ? 0L : rsh.getNextCallSeq();
1331  }
1332
1333  /**
1334   * Method to account for the size of retained cells.
1335   * @param context rpc call context
1336   * @param r       result to add size.
1337   * @return an object that represents the last referenced block from this response.
1338   */
1339  void addSize(RpcCallContext context, Result r) {
1340    if (context != null && r != null && !r.isEmpty()) {
1341      for (Cell c : r.rawCells()) {
1342        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1343      }
1344    }
1345  }
1346
1347  /** Returns Remote client's ip and port else null if can't be determined. */
1348  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1349      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1350  static String getRemoteClientIpAndPort() {
1351    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1352    if (rpcCall == null) {
1353      return HConstants.EMPTY_STRING;
1354    }
1355    InetAddress address = rpcCall.getRemoteAddress();
1356    if (address == null) {
1357      return HConstants.EMPTY_STRING;
1358    }
1359    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1360    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1361    // scanning than a hostname anyways.
1362    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1363  }
1364
1365  /** Returns Remote client's username. */
1366  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1367      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1368  static String getUserName() {
1369    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1370    if (rpcCall == null) {
1371      return HConstants.EMPTY_STRING;
1372    }
1373    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1374  }
1375
1376  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1377    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1378    Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1379      new ScannerListener(scannerName));
1380    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1381    RpcCallback closeCallback =
1382      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1383    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1384      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1385    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1386    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1387      + scannerName + ", " + existing;
1388    return rsh;
1389  }
1390
1391  private boolean isFullRegionScan(Scan scan, HRegion region) {
1392    // If the scan start row equals or less than the start key of the region
1393    // and stop row greater than equals end key (if stop row present)
1394    // or if the stop row is empty
1395    // account this as a full region scan
1396    if (
1397      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1398        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1399          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1400          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1401    ) {
1402      return true;
1403    }
1404    return false;
1405  }
1406
1407  /**
1408   * Find the HRegion based on a region specifier
1409   * @param regionSpecifier the region specifier
1410   * @return the corresponding region
1411   * @throws IOException if the specifier is not null, but failed to find the region
1412   */
1413  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1414    return server.getRegion(regionSpecifier.getValue().toByteArray());
1415  }
1416
1417  /**
1418   * Find the List of HRegions based on a list of region specifiers
1419   * @param regionSpecifiers the list of region specifiers
1420   * @return the corresponding list of regions
1421   * @throws IOException if any of the specifiers is not null, but failed to find the region
1422   */
1423  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1424    final CacheEvictionStatsBuilder stats) {
1425    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1426    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1427      try {
1428        regions.add(server.getRegion(regionSpecifier.getValue().toByteArray()));
1429      } catch (NotServingRegionException e) {
1430        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1431      }
1432    }
1433    return regions;
1434  }
1435
1436  PriorityFunction getPriority() {
1437    return priority;
1438  }
1439
1440  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1441    return server.getRegionServerRpcQuotaManager();
1442  }
1443
1444  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1445    return server.getRegionServerSpaceQuotaManager();
1446  }
1447
1448  void start(ZKWatcher zkWatcher) {
1449    this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName());
1450    internalStart(zkWatcher);
1451  }
1452
1453  void stop() {
1454    closeAllScanners();
1455    internalStop();
1456  }
1457
1458  /**
1459   * Called to verify that this server is up and running.
1460   */
1461  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1462  protected void checkOpen() throws IOException {
1463    if (server.isAborted()) {
1464      throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting");
1465    }
1466    if (server.isStopped()) {
1467      throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping");
1468    }
1469    if (!server.isDataFileSystemOk()) {
1470      throw new RegionServerStoppedException("File system not available");
1471    }
1472    if (!server.isOnline()) {
1473      throw new ServerNotRunningYetException(
1474        "Server " + server.getServerName() + " is not running yet");
1475    }
1476  }
1477
1478  /**
1479   * By default, put up an Admin and a Client Service. Set booleans
1480   * <code>hbase.regionserver.admin.executorService</code> and
1481   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1482   * Default is that both are enabled.
1483   * @return immutable list of blocking services and the security info classes that this server
1484   *         supports
1485   */
1486  protected List<BlockingServiceAndInterface> getServices() {
1487    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1488    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1489    boolean clientMeta =
1490      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1491    boolean bootstrapNodes =
1492      getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true);
1493    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1494    if (client) {
1495      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1496        ClientService.BlockingInterface.class));
1497    }
1498    if (admin) {
1499      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1500        AdminService.BlockingInterface.class));
1501    }
1502    if (clientMeta) {
1503      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1504        ClientMetaService.BlockingInterface.class));
1505    }
1506    if (bootstrapNodes) {
1507      bssi.add(
1508        new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this),
1509          BootstrapNodeService.BlockingInterface.class));
1510    }
1511    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1512  }
1513
1514  /**
1515   * Close a region on the region server.
1516   * @param controller the RPC controller
1517   * @param request    the request
1518   */
1519  @Override
1520  @QosPriority(priority = HConstants.ADMIN_QOS)
1521  public CloseRegionResponse closeRegion(final RpcController controller,
1522    final CloseRegionRequest request) throws ServiceException {
1523    final ServerName sn = (request.hasDestinationServer()
1524      ? ProtobufUtil.toServerName(request.getDestinationServer())
1525      : null);
1526
1527    try {
1528      checkOpen();
1529      throwOnWrongStartCode(request);
1530      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1531
1532      requestCount.increment();
1533      if (sn == null) {
1534        LOG.info("Close " + encodedRegionName + " without moving");
1535      } else {
1536        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1537      }
1538      boolean closed = server.closeRegion(encodedRegionName, false, sn);
1539      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1540      return builder.build();
1541    } catch (IOException ie) {
1542      throw new ServiceException(ie);
1543    }
1544  }
1545
1546  /**
1547   * Compact a region on the region server.
1548   * @param controller the RPC controller
1549   * @param request    the request
1550   */
1551  @Override
1552  @QosPriority(priority = HConstants.ADMIN_QOS)
1553  public CompactRegionResponse compactRegion(final RpcController controller,
1554    final CompactRegionRequest request) throws ServiceException {
1555    try {
1556      checkOpen();
1557      requestCount.increment();
1558      HRegion region = getRegion(request.getRegion());
1559      // Quota support is enabled, the requesting user is not system/super user
1560      // and a quota policy is enforced that disables compactions.
1561      if (
1562        QuotaUtil.isQuotaEnabled(getConfiguration())
1563          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1564          && this.server.getRegionServerSpaceQuotaManager()
1565            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1566      ) {
1567        throw new DoNotRetryIOException(
1568          "Compactions on this region are " + "disabled due to a space quota violation.");
1569      }
1570      region.startRegionOperation(Operation.COMPACT_REGION);
1571      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1572      boolean major = request.hasMajor() && request.getMajor();
1573      if (request.hasFamily()) {
1574        byte[] family = request.getFamily().toByteArray();
1575        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1576          + region.getRegionInfo().getRegionNameAsString() + " and family "
1577          + Bytes.toString(family);
1578        LOG.trace(log);
1579        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1580          CompactionLifeCycleTracker.DUMMY);
1581      } else {
1582        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1583          + region.getRegionInfo().getRegionNameAsString();
1584        LOG.trace(log);
1585        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1586      }
1587      return CompactRegionResponse.newBuilder().build();
1588    } catch (IOException ie) {
1589      throw new ServiceException(ie);
1590    }
1591  }
1592
1593  @Override
1594  @QosPriority(priority = HConstants.ADMIN_QOS)
1595  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1596    CompactionSwitchRequest request) throws ServiceException {
1597    rpcPreCheck("compactionSwitch");
1598    final CompactSplit compactSplitThread = server.getCompactSplitThread();
1599    requestCount.increment();
1600    boolean prevState = compactSplitThread.isCompactionsEnabled();
1601    CompactionSwitchResponse response =
1602      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1603    if (prevState == request.getEnabled()) {
1604      // passed in requested state is same as current state. No action required
1605      return response;
1606    }
1607    compactSplitThread.switchCompaction(request.getEnabled());
1608    return response;
1609  }
1610
1611  /**
1612   * Flush a region on the region server.
1613   * @param controller the RPC controller
1614   * @param request    the request
1615   */
1616  @Override
1617  @QosPriority(priority = HConstants.ADMIN_QOS)
1618  public FlushRegionResponse flushRegion(final RpcController controller,
1619    final FlushRegionRequest request) throws ServiceException {
1620    try {
1621      checkOpen();
1622      requestCount.increment();
1623      HRegion region = getRegion(request.getRegion());
1624      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1625      boolean shouldFlush = true;
1626      if (request.hasIfOlderThanTs()) {
1627        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1628      }
1629      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1630      if (shouldFlush) {
1631        boolean writeFlushWalMarker =
1632          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1633        // Go behind the curtain so we can manage writing of the flush WAL marker
1634        HRegion.FlushResultImpl flushResult = null;
1635        if (request.hasFamily()) {
1636          List<byte[]> families = new ArrayList();
1637          families.add(request.getFamily().toByteArray());
1638          TableDescriptor tableDescriptor = region.getTableDescriptor();
1639          List<String> noSuchFamilies = families.stream()
1640            .filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList();
1641          if (!noSuchFamilies.isEmpty()) {
1642            throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies
1643              + " don't exist in table " + tableDescriptor.getTableName().getNameAsString());
1644          }
1645          flushResult =
1646            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1647        } else {
1648          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1649        }
1650        boolean compactionNeeded = flushResult.isCompactionNeeded();
1651        if (compactionNeeded) {
1652          server.getCompactSplitThread().requestSystemCompaction(region,
1653            "Compaction through user triggered flush");
1654        }
1655        builder.setFlushed(flushResult.isFlushSucceeded());
1656        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1657      }
1658      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1659      return builder.build();
1660    } catch (DroppedSnapshotException ex) {
1661      // Cache flush can fail in a few places. If it fails in a critical
1662      // section, we get a DroppedSnapshotException and a replay of wal
1663      // is required. Currently the only way to do this is a restart of
1664      // the server.
1665      server.abort("Replay of WAL required. Forcing server shutdown", ex);
1666      throw new ServiceException(ex);
1667    } catch (IOException ie) {
1668      throw new ServiceException(ie);
1669    }
1670  }
1671
1672  @Override
1673  @QosPriority(priority = HConstants.ADMIN_QOS)
1674  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1675    final GetOnlineRegionRequest request) throws ServiceException {
1676    try {
1677      checkOpen();
1678      requestCount.increment();
1679      Map<String, HRegion> onlineRegions = server.getOnlineRegions();
1680      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1681      for (HRegion region : onlineRegions.values()) {
1682        list.add(region.getRegionInfo());
1683      }
1684      list.sort(RegionInfo.COMPARATOR);
1685      return ResponseConverter.buildGetOnlineRegionResponse(list);
1686    } catch (IOException ie) {
1687      throw new ServiceException(ie);
1688    }
1689  }
1690
1691  // Master implementation of this Admin Service differs given it is not
1692  // able to supply detail only known to RegionServer. See note on
1693  // MasterRpcServers#getRegionInfo.
1694  @Override
1695  @QosPriority(priority = HConstants.ADMIN_QOS)
1696  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1697    final GetRegionInfoRequest request) throws ServiceException {
1698    try {
1699      checkOpen();
1700      requestCount.increment();
1701      HRegion region = getRegion(request.getRegion());
1702      RegionInfo info = region.getRegionInfo();
1703      byte[] bestSplitRow;
1704      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1705        bestSplitRow = region.checkSplit(true).orElse(null);
1706        // when all table data are in memstore, bestSplitRow = null
1707        // try to flush region first
1708        if (bestSplitRow == null) {
1709          region.flush(true);
1710          bestSplitRow = region.checkSplit(true).orElse(null);
1711        }
1712      } else {
1713        bestSplitRow = null;
1714      }
1715      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1716      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1717      if (request.hasCompactionState() && request.getCompactionState()) {
1718        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1719      }
1720      builder.setSplittable(region.isSplittable());
1721      builder.setMergeable(region.isMergeable());
1722      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1723        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1724      }
1725      return builder.build();
1726    } catch (IOException ie) {
1727      throw new ServiceException(ie);
1728    }
1729  }
1730
1731  @Override
1732  @QosPriority(priority = HConstants.ADMIN_QOS)
1733  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1734    throws ServiceException {
1735
1736    List<HRegion> regions;
1737    if (request.hasTableName()) {
1738      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1739      regions = server.getRegions(tableName);
1740    } else {
1741      regions = server.getRegions();
1742    }
1743    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1744    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1745    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1746
1747    try {
1748      for (HRegion region : regions) {
1749        rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1750      }
1751    } catch (IOException e) {
1752      throw new ServiceException(e);
1753    }
1754    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1755    builder.addAllRegionLoads(rLoads);
1756    return builder.build();
1757  }
1758
1759  @Override
1760  @QosPriority(priority = HConstants.ADMIN_QOS)
1761  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1762    ClearCompactionQueuesRequest request) throws ServiceException {
1763    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1764      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1765    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1766    requestCount.increment();
1767    if (clearCompactionQueues.compareAndSet(false, true)) {
1768      final CompactSplit compactSplitThread = server.getCompactSplitThread();
1769      try {
1770        checkOpen();
1771        server.getRegionServerCoprocessorHost().preClearCompactionQueues();
1772        for (String queueName : request.getQueueNameList()) {
1773          LOG.debug("clear " + queueName + " compaction queue");
1774          switch (queueName) {
1775            case "long":
1776              compactSplitThread.clearLongCompactionsQueue();
1777              break;
1778            case "short":
1779              compactSplitThread.clearShortCompactionsQueue();
1780              break;
1781            default:
1782              LOG.warn("Unknown queue name " + queueName);
1783              throw new IOException("Unknown queue name " + queueName);
1784          }
1785        }
1786        server.getRegionServerCoprocessorHost().postClearCompactionQueues();
1787      } catch (IOException ie) {
1788        throw new ServiceException(ie);
1789      } finally {
1790        clearCompactionQueues.set(false);
1791      }
1792    } else {
1793      LOG.warn("Clear compactions queue is executing by other admin.");
1794    }
1795    return respBuilder.build();
1796  }
1797
1798  /**
1799   * Get some information of the region server.
1800   * @param controller the RPC controller
1801   * @param request    the request
1802   */
1803  @Override
1804  @QosPriority(priority = HConstants.ADMIN_QOS)
1805  public GetServerInfoResponse getServerInfo(final RpcController controller,
1806    final GetServerInfoRequest request) throws ServiceException {
1807    try {
1808      checkOpen();
1809    } catch (IOException ie) {
1810      throw new ServiceException(ie);
1811    }
1812    requestCount.increment();
1813    int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1;
1814    return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort);
1815  }
1816
1817  @Override
1818  @QosPriority(priority = HConstants.ADMIN_QOS)
1819  public GetStoreFileResponse getStoreFile(final RpcController controller,
1820    final GetStoreFileRequest request) throws ServiceException {
1821    try {
1822      checkOpen();
1823      HRegion region = getRegion(request.getRegion());
1824      requestCount.increment();
1825      Set<byte[]> columnFamilies;
1826      if (request.getFamilyCount() == 0) {
1827        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1828      } else {
1829        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1830        for (ByteString cf : request.getFamilyList()) {
1831          columnFamilies.add(cf.toByteArray());
1832        }
1833      }
1834      int nCF = columnFamilies.size();
1835      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1836      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1837      builder.addAllStoreFile(fileList);
1838      return builder.build();
1839    } catch (IOException ie) {
1840      throw new ServiceException(ie);
1841    }
1842  }
1843
1844  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1845    if (!request.hasServerStartCode()) {
1846      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1847      return;
1848    }
1849    throwOnWrongStartCode(request.getServerStartCode());
1850  }
1851
1852  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1853    if (!request.hasServerStartCode()) {
1854      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1855      return;
1856    }
1857    throwOnWrongStartCode(request.getServerStartCode());
1858  }
1859
1860  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1861    // check that we are the same server that this RPC is intended for.
1862    if (server.getServerName().getStartcode() != serverStartCode) {
1863      throw new ServiceException(new DoNotRetryIOException(
1864        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1865          + ", this server is: " + server.getServerName()));
1866    }
1867  }
1868
1869  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1870    if (req.getOpenRegionCount() > 0) {
1871      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1872        throwOnWrongStartCode(openReq);
1873      }
1874    }
1875    if (req.getCloseRegionCount() > 0) {
1876      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1877        throwOnWrongStartCode(closeReq);
1878      }
1879    }
1880  }
1881
1882  /**
1883   * Open asynchronously a region or a set of regions on the region server. The opening is
1884   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
1885   * As a consequence, this method should be called only from the master.
1886   * <p>
1887   * Different manages states for the region are:
1888   * </p>
1889   * <ul>
1890   * <li>region not opened: the region opening will start asynchronously.</li>
1891   * <li>a close is already in progress: this is considered as an error.</li>
1892   * <li>an open is already in progress: this new open request will be ignored. This is important
1893   * because the Master can do multiple requests if it crashes.</li>
1894   * <li>the region is already opened: this new open request will be ignored.</li>
1895   * </ul>
1896   * <p>
1897   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1898   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1899   * errors are put in the response as FAILED_OPENING.
1900   * </p>
1901   * @param controller the RPC controller
1902   * @param request    the request
1903   */
1904  @Override
1905  @QosPriority(priority = HConstants.ADMIN_QOS)
1906  public OpenRegionResponse openRegion(final RpcController controller,
1907    final OpenRegionRequest request) throws ServiceException {
1908    requestCount.increment();
1909    throwOnWrongStartCode(request);
1910
1911    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1912    final int regionCount = request.getOpenInfoCount();
1913    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1914    final boolean isBulkAssign = regionCount > 1;
1915    try {
1916      checkOpen();
1917    } catch (IOException ie) {
1918      TableName tableName = null;
1919      if (regionCount == 1) {
1920        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
1921          request.getOpenInfo(0).getRegion();
1922        if (ri != null) {
1923          tableName = ProtobufUtil.toTableName(ri.getTableName());
1924        }
1925      }
1926      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1927        throw new ServiceException(ie);
1928      }
1929      // We are assigning meta, wait a little for regionserver to finish initialization.
1930      // Default to quarter of RPC timeout
1931      int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1932        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1933      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1934      synchronized (server.online) {
1935        try {
1936          while (
1937            EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped()
1938              && !server.isOnline()
1939          ) {
1940            server.online.wait(server.getMsgInterval());
1941          }
1942          checkOpen();
1943        } catch (InterruptedException t) {
1944          Thread.currentThread().interrupt();
1945          throw new ServiceException(t);
1946        } catch (IOException e) {
1947          throw new ServiceException(e);
1948        }
1949      }
1950    }
1951
1952    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1953
1954    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1955      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1956      TableDescriptor htd;
1957      try {
1958        String encodedName = region.getEncodedName();
1959        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1960        final HRegion onlineRegion = server.getRegion(encodedName);
1961        if (onlineRegion != null) {
1962          // The region is already online. This should not happen any more.
1963          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1964            + ", which is already online";
1965          LOG.warn(error);
1966          // server.abort(error);
1967          // throw new IOException(error);
1968          builder.addOpeningState(RegionOpeningState.OPENED);
1969          continue;
1970        }
1971        LOG.info("Open " + region.getRegionNameAsString());
1972
1973        final Boolean previous =
1974          server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
1975
1976        if (Boolean.FALSE.equals(previous)) {
1977          if (server.getRegion(encodedName) != null) {
1978            // There is a close in progress. This should not happen any more.
1979            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1980              + ", which we are already trying to CLOSE";
1981            server.abort(error);
1982            throw new IOException(error);
1983          }
1984          server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
1985        }
1986
1987        if (Boolean.TRUE.equals(previous)) {
1988          // An open is in progress. This is supported, but let's log this.
1989          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
1990            + ", which we are already trying to OPEN"
1991            + " - ignoring this new request for this region.");
1992        }
1993
1994        // We are opening this region. If it moves back and forth for whatever reason, we don't
1995        // want to keep returning the stale moved record while we are opening/if we close again.
1996        server.removeFromMovedRegions(region.getEncodedName());
1997
1998        if (previous == null || !previous.booleanValue()) {
1999          htd = htds.get(region.getTable());
2000          if (htd == null) {
2001            htd = server.getTableDescriptors().get(region.getTable());
2002            htds.put(region.getTable(), htd);
2003          }
2004          if (htd == null) {
2005            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2006          }
2007          // If there is no action in progress, we can submit a specific handler.
2008          // Need to pass the expected version in the constructor.
2009          if (server.getExecutorService() == null) {
2010            LOG.info("No executor executorService; skipping open request");
2011          } else {
2012            if (region.isMetaRegion()) {
2013              server.getExecutorService()
2014                .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime));
2015            } else {
2016              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2017                server.updateRegionFavoredNodesMapping(region.getEncodedName(),
2018                  regionOpenInfo.getFavoredNodesList());
2019              }
2020              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2021                server.getExecutorService().submit(
2022                  new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime));
2023              } else {
2024                server.getExecutorService()
2025                  .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime));
2026              }
2027            }
2028          }
2029        }
2030
2031        builder.addOpeningState(RegionOpeningState.OPENED);
2032      } catch (IOException ie) {
2033        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2034        if (isBulkAssign) {
2035          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2036        } else {
2037          throw new ServiceException(ie);
2038        }
2039      }
2040    }
2041    return builder.build();
2042  }
2043
2044  /**
2045   * Warmup a region on this server. This method should only be called by Master. It synchronously
2046   * opens the region and closes the region bringing the most important pages in cache.
2047   */
2048  @Override
2049  public WarmupRegionResponse warmupRegion(final RpcController controller,
2050    final WarmupRegionRequest request) throws ServiceException {
2051    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2052    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2053    try {
2054      checkOpen();
2055      String encodedName = region.getEncodedName();
2056      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2057      final HRegion onlineRegion = server.getRegion(encodedName);
2058      if (onlineRegion != null) {
2059        LOG.info("{} is online; skipping warmup", region);
2060        return response;
2061      }
2062      TableDescriptor htd = server.getTableDescriptors().get(region.getTable());
2063      if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2064        LOG.info("{} is in transition; skipping warmup", region);
2065        return response;
2066      }
2067      LOG.info("Warmup {}", region.getRegionNameAsString());
2068      HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server,
2069        null);
2070    } catch (IOException ie) {
2071      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2072      throw new ServiceException(ie);
2073    }
2074
2075    return response;
2076  }
2077
2078  private ExtendedCellScanner getAndReset(RpcController controller) {
2079    HBaseRpcController hrc = (HBaseRpcController) controller;
2080    ExtendedCellScanner cells = hrc.cellScanner();
2081    hrc.setCellScanner(null);
2082    return cells;
2083  }
2084
2085  /**
2086   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2087   * that the given mutations will be durable on the receiving RS if this method returns without any
2088   * exception.
2089   * @param controller the RPC controller
2090   * @param request    the request
2091   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
2092   *             compatibility with old region replica implementation. Now we will use
2093   *             {@code replicateToReplica} method instead.
2094   */
2095  @Deprecated
2096  @Override
2097  @QosPriority(priority = HConstants.REPLAY_QOS)
2098  public ReplicateWALEntryResponse replay(final RpcController controller,
2099    final ReplicateWALEntryRequest request) throws ServiceException {
2100    long before = EnvironmentEdgeManager.currentTime();
2101    ExtendedCellScanner cells = getAndReset(controller);
2102    try {
2103      checkOpen();
2104      List<WALEntry> entries = request.getEntryList();
2105      if (entries == null || entries.isEmpty()) {
2106        // empty input
2107        return ReplicateWALEntryResponse.newBuilder().build();
2108      }
2109      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2110      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2111      RegionCoprocessorHost coprocessorHost =
2112        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2113          ? region.getCoprocessorHost()
2114          : null; // do not invoke coprocessors if this is a secondary region replica
2115
2116      // Skip adding the edits to WAL if this is a secondary region replica
2117      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2118      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2119
2120      for (WALEntry entry : entries) {
2121        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2122          throw new NotServingRegionException("Replay request contains entries from multiple "
2123            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2124            + entry.getKey().getEncodedRegionName());
2125        }
2126        if (server.nonceManager != null && isPrimary) {
2127          long nonceGroup =
2128            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2129          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2130          server.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2131            entry.getKey().getWriteTime());
2132        }
2133        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2134        List<MutationReplay> edits =
2135          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2136        if (edits != null && !edits.isEmpty()) {
2137          // HBASE-17924
2138          // sort to improve lock efficiency
2139          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2140          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2141            ? entry.getKey().getOrigSequenceNumber()
2142            : entry.getKey().getLogSequenceNumber();
2143          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2144          // check if it's a partial success
2145          for (int i = 0; result != null && i < result.length; i++) {
2146            if (result[i] != OperationStatus.SUCCESS) {
2147              throw new IOException(result[i].getExceptionMsg());
2148            }
2149          }
2150        }
2151      }
2152
2153      // sync wal at the end because ASYNC_WAL is used above
2154      WAL wal = region.getWAL();
2155      if (wal != null) {
2156        wal.sync();
2157      }
2158      return ReplicateWALEntryResponse.newBuilder().build();
2159    } catch (IOException ie) {
2160      throw new ServiceException(ie);
2161    } finally {
2162      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2163      if (metricsRegionServer != null) {
2164        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2165      }
2166    }
2167  }
2168
2169  /**
2170   * Replay the given changes on a secondary replica
2171   */
2172  @Override
2173  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
2174    ReplicateWALEntryRequest request) throws ServiceException {
2175    CellScanner cells = getAndReset(controller);
2176    try {
2177      checkOpen();
2178      List<WALEntry> entries = request.getEntryList();
2179      if (entries == null || entries.isEmpty()) {
2180        // empty input
2181        return ReplicateWALEntryResponse.newBuilder().build();
2182      }
2183      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2184      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2185      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2186        throw new DoNotRetryIOException(
2187          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
2188      }
2189      for (WALEntry entry : entries) {
2190        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2191          throw new NotServingRegionException(
2192            "ReplicateToReplica request contains entries from multiple " + "regions. First region:"
2193              + regionName.toStringUtf8() + " , other region:"
2194              + entry.getKey().getEncodedRegionName());
2195        }
2196        region.replayWALEntry(entry, cells);
2197      }
2198      return ReplicateWALEntryResponse.newBuilder().build();
2199    } catch (IOException ie) {
2200      throw new ServiceException(ie);
2201    }
2202  }
2203
2204  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
2205    ReplicationSourceService replicationSource = server.getReplicationSourceService();
2206    if (replicationSource == null || entries.isEmpty()) {
2207      return;
2208    }
2209    // We can ensure that all entries are for one peer, so only need to check one entry's
2210    // table name. if the table hit sync replication at peer side and the peer cluster
2211    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
2212    // those entries according to the design doc.
2213    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
2214    if (
2215      replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
2216        RejectReplicationRequestStateChecker.get())
2217    ) {
2218      throw new DoNotRetryIOException(
2219        "Reject to apply to sink cluster because sync replication state of sink cluster "
2220          + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
2221    }
2222  }
2223
2224  /**
2225   * Replicate WAL entries on the region server.
2226   * @param controller the RPC controller
2227   * @param request    the request
2228   */
2229  @Override
2230  @QosPriority(priority = HConstants.REPLICATION_QOS)
2231  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2232    final ReplicateWALEntryRequest request) throws ServiceException {
2233    try {
2234      checkOpen();
2235      if (server.getReplicationSinkService() != null) {
2236        requestCount.increment();
2237        List<WALEntry> entries = request.getEntryList();
2238        checkShouldRejectReplicationRequest(entries);
2239        ExtendedCellScanner cellScanner = getAndReset(controller);
2240        server.getRegionServerCoprocessorHost().preReplicateLogEntries();
2241        server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2242          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2243          request.getSourceHFileArchiveDirPath());
2244        server.getRegionServerCoprocessorHost().postReplicateLogEntries();
2245        return ReplicateWALEntryResponse.newBuilder().build();
2246      } else {
2247        throw new ServiceException("Replication services are not initialized yet");
2248      }
2249    } catch (IOException ie) {
2250      throw new ServiceException(ie);
2251    }
2252  }
2253
2254  /**
2255   * Roll the WAL writer of the region server.
2256   * @param controller the RPC controller
2257   * @param request    the request
2258   */
2259  @Override
2260  @QosPriority(priority = HConstants.ADMIN_QOS)
2261  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2262    final RollWALWriterRequest request) throws ServiceException {
2263    try {
2264      checkOpen();
2265      requestCount.increment();
2266      server.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2267      server.getWalRoller().requestRollAll();
2268      server.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2269      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2270      return builder.build();
2271    } catch (IOException ie) {
2272      throw new ServiceException(ie);
2273    }
2274  }
2275
2276  /**
2277   * Stop the region server.
2278   * @param controller the RPC controller
2279   * @param request    the request
2280   */
2281  @Override
2282  @QosPriority(priority = HConstants.ADMIN_QOS)
2283  public StopServerResponse stopServer(final RpcController controller,
2284    final StopServerRequest request) throws ServiceException {
2285    rpcPreCheck("stopServer");
2286    requestCount.increment();
2287    String reason = request.getReason();
2288    server.stop(reason);
2289    return StopServerResponse.newBuilder().build();
2290  }
2291
2292  @Override
2293  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2294    UpdateFavoredNodesRequest request) throws ServiceException {
2295    rpcPreCheck("updateFavoredNodes");
2296    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2297    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2298    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2299      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2300      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2301        server.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2302          regionUpdateInfo.getFavoredNodesList());
2303      }
2304    }
2305    respBuilder.setResponse(openInfoList.size());
2306    return respBuilder.build();
2307  }
2308
2309  /**
2310   * Atomically bulk load several HFiles into an open region
2311   * @return true if successful, false is failed but recoverably (no action)
2312   * @throws ServiceException if failed unrecoverably
2313   */
2314  @Override
2315  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2316    final BulkLoadHFileRequest request) throws ServiceException {
2317    long start = EnvironmentEdgeManager.currentTime();
2318    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2319    if (clusterIds.contains(this.server.getClusterId())) {
2320      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2321    } else {
2322      clusterIds.add(this.server.getClusterId());
2323    }
2324    try {
2325      checkOpen();
2326      requestCount.increment();
2327      HRegion region = getRegion(request.getRegion());
2328      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2329      long sizeToBeLoaded = -1;
2330
2331      // Check to see if this bulk load would exceed the space quota for this table
2332      if (spaceQuotaEnabled) {
2333        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2334        SpaceViolationPolicyEnforcement enforcement =
2335          activeSpaceQuotas.getPolicyEnforcement(region);
2336        if (enforcement != null) {
2337          // Bulk loads must still be atomic. We must enact all or none.
2338          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2339          for (FamilyPath familyPath : request.getFamilyPathList()) {
2340            filePaths.add(familyPath.getPath());
2341          }
2342          // Check if the batch of files exceeds the current quota
2343          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2344        }
2345      }
2346      // secure bulk load
2347      Map<byte[], List<Path>> map =
2348        server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2349      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2350      builder.setLoaded(map != null);
2351      if (map != null) {
2352        // Treat any negative size as a flag to "ignore" updating the region size as that is
2353        // not possible to occur in real life (cannot bulk load a file with negative size)
2354        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2355          if (LOG.isTraceEnabled()) {
2356            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2357              + sizeToBeLoaded + " bytes");
2358          }
2359          // Inform space quotas of the new files for this region
2360          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2361            sizeToBeLoaded);
2362        }
2363      }
2364      return builder.build();
2365    } catch (IOException ie) {
2366      throw new ServiceException(ie);
2367    } finally {
2368      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2369      if (metricsRegionServer != null) {
2370        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2371      }
2372    }
2373  }
2374
2375  @Override
2376  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2377    PrepareBulkLoadRequest request) throws ServiceException {
2378    try {
2379      checkOpen();
2380      requestCount.increment();
2381
2382      HRegion region = getRegion(request.getRegion());
2383
2384      String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2385      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2386      builder.setBulkToken(bulkToken);
2387      return builder.build();
2388    } catch (IOException ie) {
2389      throw new ServiceException(ie);
2390    }
2391  }
2392
2393  @Override
2394  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2395    CleanupBulkLoadRequest request) throws ServiceException {
2396    try {
2397      checkOpen();
2398      requestCount.increment();
2399
2400      HRegion region = getRegion(request.getRegion());
2401
2402      server.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2403      return CleanupBulkLoadResponse.newBuilder().build();
2404    } catch (IOException ie) {
2405      throw new ServiceException(ie);
2406    }
2407  }
2408
2409  @Override
2410  public CoprocessorServiceResponse execService(final RpcController controller,
2411    final CoprocessorServiceRequest request) throws ServiceException {
2412    try {
2413      checkOpen();
2414      requestCount.increment();
2415      HRegion region = getRegion(request.getRegion());
2416      Message result = execServiceOnRegion(region, request.getCall());
2417      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2418      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2419        region.getRegionInfo().getRegionName()));
2420      // TODO: COPIES!!!!!!
2421      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2422        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2423      return builder.build();
2424    } catch (IOException ie) {
2425      throw new ServiceException(ie);
2426    }
2427  }
2428
2429  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2430    if (filePaths.isEmpty()) {
2431      // local hdfs
2432      return server.getFileSystem();
2433    }
2434    // source hdfs
2435    return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration());
2436  }
2437
2438  private Message execServiceOnRegion(HRegion region,
2439    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2440    // ignore the passed in controller (from the serialized call)
2441    ServerRpcController execController = new ServerRpcController();
2442    return region.execService(execController, serviceCall);
2443  }
2444
2445  private boolean shouldRejectRequestsFromClient(HRegion region) {
2446    TableName table = region.getRegionInfo().getTable();
2447    ReplicationSourceService service = server.getReplicationSourceService();
2448    return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table,
2449      RejectRequestsFromClientStateChecker.get());
2450  }
2451
2452  private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
2453    if (shouldRejectRequestsFromClient(region)) {
2454      throw new DoNotRetryIOException(
2455        region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
2456    }
2457  }
2458
2459  /**
2460   * Get data from a table.
2461   * @param controller the RPC controller
2462   * @param request    the get request
2463   */
2464  @Override
2465  public GetResponse get(final RpcController controller, final GetRequest request)
2466    throws ServiceException {
2467    long before = EnvironmentEdgeManager.currentTime();
2468    OperationQuota quota = null;
2469    HRegion region = null;
2470    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2471    try {
2472      checkOpen();
2473      requestCount.increment();
2474      rpcGetRequestCount.increment();
2475      region = getRegion(request.getRegion());
2476      rejectIfInStandByState(region);
2477
2478      GetResponse.Builder builder = GetResponse.newBuilder();
2479      ClientProtos.Get get = request.getGet();
2480      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2481      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2482      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2483      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2484      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2485        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2486          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2487          + "reverse Scan.");
2488      }
2489      Boolean existence = null;
2490      Result r = null;
2491      quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET);
2492
2493      Get clientGet = ProtobufUtil.toGet(get);
2494      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2495        existence = region.getCoprocessorHost().preExists(clientGet);
2496      }
2497      if (existence == null) {
2498        if (context != null) {
2499          r = get(clientGet, (region), null, context);
2500        } else {
2501          // for test purpose
2502          r = region.get(clientGet);
2503        }
2504        if (get.getExistenceOnly()) {
2505          boolean exists = r.getExists();
2506          if (region.getCoprocessorHost() != null) {
2507            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2508          }
2509          existence = exists;
2510        }
2511      }
2512      if (existence != null) {
2513        ClientProtos.Result pbr =
2514          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2515        builder.setResult(pbr);
2516      } else if (r != null) {
2517        ClientProtos.Result pbr;
2518        if (
2519          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2520            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2521        ) {
2522          pbr = ProtobufUtil.toResultNoData(r);
2523          ((HBaseRpcController) controller).setCellScanner(
2524            PrivateCellUtil.createExtendedCellScanner(ClientInternalHelper.getExtendedRawCells(r)));
2525          addSize(context, r);
2526        } else {
2527          pbr = ProtobufUtil.toResult(r);
2528        }
2529        builder.setResult(pbr);
2530      }
2531      // r.cells is null when an table.exists(get) call
2532      if (r != null && r.rawCells() != null) {
2533        quota.addGetResult(r);
2534      }
2535      return builder.build();
2536    } catch (IOException ie) {
2537      throw new ServiceException(ie);
2538    } finally {
2539      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2540      if (metricsRegionServer != null && region != null) {
2541        long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0;
2542        metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
2543          blockBytesScanned);
2544      }
2545      if (quota != null) {
2546        quota.close();
2547      }
2548    }
2549  }
2550
2551  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2552    RpcCallContext context) throws IOException {
2553    region.prepareGet(get);
2554    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2555
2556    // This method is almost the same as HRegion#get.
2557    List<Cell> results = new ArrayList<>();
2558    // pre-get CP hook
2559    if (region.getCoprocessorHost() != null) {
2560      if (region.getCoprocessorHost().preGet(get, results)) {
2561        region.metricsUpdateForGet();
2562        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2563          stale);
2564      }
2565    }
2566    Scan scan = new Scan(get);
2567    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2568      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2569    }
2570    RegionScannerImpl scanner = null;
2571    long blockBytesScannedBefore = context.getBlockBytesScanned();
2572    try {
2573      scanner = region.getScanner(scan);
2574      scanner.next(results);
2575    } finally {
2576      if (scanner != null) {
2577        if (closeCallBack == null) {
2578          // If there is a context then the scanner can be added to the current
2579          // RpcCallContext. The rpc callback will take care of closing the
2580          // scanner, for eg in case
2581          // of get()
2582          context.setCallBack(scanner);
2583        } else {
2584          // The call is from multi() where the results from the get() are
2585          // aggregated and then send out to the
2586          // rpc. The rpccall back will close all such scanners created as part
2587          // of multi().
2588          closeCallBack.addScanner(scanner);
2589        }
2590      }
2591    }
2592
2593    // post-get CP hook
2594    if (region.getCoprocessorHost() != null) {
2595      region.getCoprocessorHost().postGet(get, results);
2596    }
2597    region.metricsUpdateForGet();
2598
2599    Result r =
2600      Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2601    if (get.isQueryMetricsEnabled()) {
2602      long blockBytesScanned = context.getBlockBytesScanned() - blockBytesScannedBefore;
2603      r.setMetrics(new QueryMetrics(blockBytesScanned));
2604    }
2605    return r;
2606  }
2607
2608  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2609    int sum = 0;
2610    String firstRegionName = null;
2611    for (RegionAction regionAction : request.getRegionActionList()) {
2612      if (sum == 0) {
2613        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2614      }
2615      sum += regionAction.getActionCount();
2616    }
2617    if (sum > rowSizeWarnThreshold) {
2618      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2619        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2620        + RpcServer.getRequestUserName().orElse(null) + "/"
2621        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2622      if (rejectRowsWithSizeOverThreshold) {
2623        throw new ServiceException(
2624          "Rejecting large batch operation for current batch with firstRegionName: "
2625            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2626            + rowSizeWarnThreshold);
2627      }
2628    }
2629  }
2630
2631  private void failRegionAction(MultiResponse.Builder responseBuilder,
2632    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2633    CellScanner cellScanner, Throwable error) {
2634    rpcServer.getMetrics().exception(error);
2635    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2636    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2637    // All Mutations in this RegionAction not executed as we can not see the Region online here
2638    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2639    // corresponding to these Mutations.
2640    if (cellScanner != null) {
2641      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2642    }
2643  }
2644
2645  private boolean isReplicationRequest(Action action) {
2646    // replication request can only be put or delete.
2647    if (!action.hasMutation()) {
2648      return false;
2649    }
2650    MutationProto mutation = action.getMutation();
2651    MutationType type = mutation.getMutateType();
2652    if (type != MutationType.PUT && type != MutationType.DELETE) {
2653      return false;
2654    }
2655    // replication will set a special attribute so we can make use of it to decide whether a request
2656    // is for replication.
2657    return mutation.getAttributeList().stream().map(p -> p.getName())
2658      .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
2659  }
2660
2661  /**
2662   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2663   * @param rpcc    the RPC controller
2664   * @param request the multi request
2665   */
2666  @Override
2667  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2668    throws ServiceException {
2669    try {
2670      checkOpen();
2671    } catch (IOException ie) {
2672      throw new ServiceException(ie);
2673    }
2674
2675    checkBatchSizeAndLogLargeSize(request);
2676
2677    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2678    // It is also the conduit via which we pass back data.
2679    HBaseRpcController controller = (HBaseRpcController) rpcc;
2680    CellScanner cellScanner = controller != null ? getAndReset(controller) : null;
2681
2682    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2683
2684    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2685    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2686    this.rpcMultiRequestCount.increment();
2687    this.requestCount.increment();
2688    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2689
2690    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2691    // following logic is for backward compatibility as old clients still use
2692    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2693    if (request.hasCondition()) {
2694      if (request.getRegionActionList().isEmpty()) {
2695        // If the region action list is empty, do nothing.
2696        responseBuilder.setProcessed(true);
2697        return responseBuilder.build();
2698      }
2699
2700      RegionAction regionAction = request.getRegionAction(0);
2701
2702      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2703      // we can assume regionAction.getAtomic() is true here.
2704      assert regionAction.getAtomic();
2705
2706      OperationQuota quota;
2707      HRegion region;
2708      RegionSpecifier regionSpecifier = regionAction.getRegion();
2709
2710      try {
2711        region = getRegion(regionSpecifier);
2712        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2713          regionAction.hasCondition());
2714      } catch (IOException e) {
2715        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2716        return responseBuilder.build();
2717      }
2718
2719      try {
2720        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2721        // We only allow replication in standby state and it will not set the atomic flag.
2722        if (rejectIfFromClient) {
2723          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2724            new DoNotRetryIOException(
2725              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2726          return responseBuilder.build();
2727        }
2728
2729        try {
2730          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2731            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2732          responseBuilder.setProcessed(result.isSuccess());
2733          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2734            ClientProtos.ResultOrException.newBuilder();
2735          for (int i = 0; i < regionAction.getActionCount(); i++) {
2736            // To unify the response format with doNonAtomicRegionMutation and read through
2737            // client's AsyncProcess we have to add an empty result instance per operation
2738            resultOrExceptionOrBuilder.clear();
2739            resultOrExceptionOrBuilder.setIndex(i);
2740            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2741          }
2742        } catch (IOException e) {
2743          rpcServer.getMetrics().exception(e);
2744          // As it's an atomic operation with a condition, we may expect it's a global failure.
2745          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2746        }
2747      } finally {
2748        quota.close();
2749      }
2750
2751      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2752      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2753      if (regionLoadStats != null) {
2754        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2755          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2756      }
2757      return responseBuilder.build();
2758    }
2759
2760    // this will contain all the cells that we need to return. It's created later, if needed.
2761    List<ExtendedCellScannable> cellsToReturn = null;
2762    RegionScannersCloseCallBack closeCallBack = null;
2763    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2764    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2765      new HashMap<>(request.getRegionActionCount());
2766
2767    for (RegionAction regionAction : request.getRegionActionList()) {
2768      OperationQuota quota;
2769      HRegion region;
2770      RegionSpecifier regionSpecifier = regionAction.getRegion();
2771      regionActionResultBuilder.clear();
2772
2773      try {
2774        region = getRegion(regionSpecifier);
2775        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2776          regionAction.hasCondition());
2777      } catch (IOException e) {
2778        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2779        continue; // For this region it's a failure.
2780      }
2781
2782      try {
2783        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2784
2785        if (regionAction.hasCondition()) {
2786          // We only allow replication in standby state and it will not set the atomic flag.
2787          if (rejectIfFromClient) {
2788            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2789              new DoNotRetryIOException(
2790                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2791            continue;
2792          }
2793
2794          try {
2795            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2796              ClientProtos.ResultOrException.newBuilder();
2797            if (regionAction.getActionCount() == 1) {
2798              CheckAndMutateResult result =
2799                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2800                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2801              regionActionResultBuilder.setProcessed(result.isSuccess());
2802              resultOrExceptionOrBuilder.setIndex(0);
2803              if (result.getResult() != null) {
2804                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2805              }
2806
2807              if (result.getMetrics() != null) {
2808                resultOrExceptionOrBuilder
2809                  .setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics()));
2810              }
2811
2812              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2813            } else {
2814              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2815                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2816              regionActionResultBuilder.setProcessed(result.isSuccess());
2817              for (int i = 0; i < regionAction.getActionCount(); i++) {
2818                if (i == 0 && result.getResult() != null) {
2819                  // Set the result of the Increment/Append operations to the first element of the
2820                  // ResultOrException list
2821                  resultOrExceptionOrBuilder.setIndex(i);
2822                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2823                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2824                  continue;
2825                }
2826                // To unify the response format with doNonAtomicRegionMutation and read through
2827                // client's AsyncProcess we have to add an empty result instance per operation
2828                resultOrExceptionOrBuilder.clear();
2829                resultOrExceptionOrBuilder.setIndex(i);
2830                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2831              }
2832            }
2833          } catch (IOException e) {
2834            rpcServer.getMetrics().exception(e);
2835            // As it's an atomic operation with a condition, we may expect it's a global failure.
2836            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2837          }
2838        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2839          // We only allow replication in standby state and it will not set the atomic flag.
2840          if (rejectIfFromClient) {
2841            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2842              new DoNotRetryIOException(
2843                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2844            continue;
2845          }
2846          try {
2847            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2848              cellScanner, nonceGroup, spaceQuotaEnforcement);
2849            regionActionResultBuilder.setProcessed(true);
2850            // We no longer use MultiResponse#processed. Instead, we use
2851            // RegionActionResult#processed. This is for backward compatibility for old clients.
2852            responseBuilder.setProcessed(true);
2853          } catch (IOException e) {
2854            rpcServer.getMetrics().exception(e);
2855            // As it's atomic, we may expect it's a global failure.
2856            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2857          }
2858        } else {
2859          if (
2860            rejectIfFromClient && regionAction.getActionCount() > 0
2861              && !isReplicationRequest(regionAction.getAction(0))
2862          ) {
2863            // fail if it is not a replication request
2864            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2865              new DoNotRetryIOException(
2866                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2867            continue;
2868          }
2869          // doNonAtomicRegionMutation manages the exception internally
2870          if (context != null && closeCallBack == null) {
2871            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2872            // operation on completion of multiGets.
2873            // Set this only once
2874            closeCallBack = new RegionScannersCloseCallBack();
2875            context.setCallBack(closeCallBack);
2876          }
2877          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2878            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2879            spaceQuotaEnforcement);
2880        }
2881      } finally {
2882        quota.close();
2883      }
2884
2885      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2886      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2887      if (regionLoadStats != null) {
2888        regionStats.put(regionSpecifier, regionLoadStats);
2889      }
2890    }
2891    // Load the controller with the Cells to return.
2892    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2893      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn));
2894    }
2895
2896    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2897    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2898      builder.addRegion(stat.getKey());
2899      builder.addStat(stat.getValue());
2900    }
2901    responseBuilder.setRegionStatistics(builder);
2902    return responseBuilder.build();
2903  }
2904
2905  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2906    if (cellScanner == null) {
2907      return;
2908    }
2909    for (Action action : actions) {
2910      skipCellsForMutation(action, cellScanner);
2911    }
2912  }
2913
2914  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2915    if (cellScanner == null) {
2916      return;
2917    }
2918    try {
2919      if (action.hasMutation()) {
2920        MutationProto m = action.getMutation();
2921        if (m.hasAssociatedCellCount()) {
2922          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2923            cellScanner.advance();
2924          }
2925        }
2926      }
2927    } catch (IOException e) {
2928      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2929      // marked as failed as we could not see the Region here. At client side the top level
2930      // RegionAction exception will be considered first.
2931      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2932    }
2933  }
2934
2935  /**
2936   * Mutate data in a table.
2937   * @param rpcc    the RPC controller
2938   * @param request the mutate request
2939   */
2940  @Override
2941  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2942    throws ServiceException {
2943    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2944    // It is also the conduit via which we pass back data.
2945    HBaseRpcController controller = (HBaseRpcController) rpcc;
2946    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2947    OperationQuota quota = null;
2948    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2949    // Clear scanner so we are not holding on to reference across call.
2950    if (controller != null) {
2951      controller.setCellScanner(null);
2952    }
2953    try {
2954      checkOpen();
2955      requestCount.increment();
2956      rpcMutateRequestCount.increment();
2957      HRegion region = getRegion(request.getRegion());
2958      rejectIfInStandByState(region);
2959      MutateResponse.Builder builder = MutateResponse.newBuilder();
2960      MutationProto mutation = request.getMutation();
2961      if (!region.getRegionInfo().isMetaRegion()) {
2962        server.getMemStoreFlusher().reclaimMemStoreMemory();
2963      }
2964      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2965      OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
2966      quota = getRpcQuotaManager().checkBatchQuota(region, operationType);
2967      ActivePolicyEnforcement spaceQuotaEnforcement =
2968        getSpaceQuotaManager().getActiveEnforcements();
2969
2970      if (request.hasCondition()) {
2971        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2972          request.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2973        builder.setProcessed(result.isSuccess());
2974        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2975        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2976        if (clientCellBlockSupported) {
2977          addSize(context, result.getResult());
2978        }
2979        if (result.getMetrics() != null) {
2980          builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics()));
2981        }
2982      } else {
2983        Result r = null;
2984        Boolean processed = null;
2985        MutationType type = mutation.getMutateType();
2986        switch (type) {
2987          case APPEND:
2988            // TODO: this doesn't actually check anything.
2989            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2990              context);
2991            break;
2992          case INCREMENT:
2993            // TODO: this doesn't actually check anything.
2994            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2995              context);
2996            break;
2997          case PUT:
2998            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2999            processed = Boolean.TRUE;
3000            break;
3001          case DELETE:
3002            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3003            processed = Boolean.TRUE;
3004            break;
3005          default:
3006            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3007        }
3008        if (processed != null) {
3009          builder.setProcessed(processed);
3010        }
3011        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
3012        addResult(builder, r, controller, clientCellBlockSupported);
3013        if (clientCellBlockSupported) {
3014          addSize(context, r);
3015        }
3016      }
3017      return builder.build();
3018    } catch (IOException ie) {
3019      server.checkFileSystem();
3020      throw new ServiceException(ie);
3021    } finally {
3022      if (quota != null) {
3023        quota.close();
3024      }
3025    }
3026  }
3027
3028  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
3029    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3030    long before = EnvironmentEdgeManager.currentTime();
3031    Put put = ProtobufUtil.toPut(mutation, cellScanner);
3032    checkCellSizeLimit(region, put);
3033    spaceQuota.getPolicyEnforcement(region).check(put);
3034    quota.addMutation(put);
3035    region.put(put);
3036
3037    MetricsRegionServer metricsRegionServer = server.getMetrics();
3038    if (metricsRegionServer != null) {
3039      long after = EnvironmentEdgeManager.currentTime();
3040      metricsRegionServer.updatePut(region, after - before);
3041    }
3042  }
3043
3044  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3045    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3046    long before = EnvironmentEdgeManager.currentTime();
3047    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3048    checkCellSizeLimit(region, delete);
3049    spaceQuota.getPolicyEnforcement(region).check(delete);
3050    quota.addMutation(delete);
3051    region.delete(delete);
3052
3053    MetricsRegionServer metricsRegionServer = server.getMetrics();
3054    if (metricsRegionServer != null) {
3055      long after = EnvironmentEdgeManager.currentTime();
3056      metricsRegionServer.updateDelete(region, after - before);
3057    }
3058  }
3059
3060  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3061    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3062    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
3063    long before = EnvironmentEdgeManager.currentTime();
3064    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
3065    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3066    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3067    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3068    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3069    quota.addMutation((Mutation) checkAndMutate.getAction());
3070
3071    CheckAndMutateResult result = null;
3072    if (region.getCoprocessorHost() != null) {
3073      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3074    }
3075    if (result == null) {
3076      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3077      if (region.getCoprocessorHost() != null) {
3078        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3079      }
3080    }
3081    MetricsRegionServer metricsRegionServer = server.getMetrics();
3082    if (metricsRegionServer != null) {
3083      long after = EnvironmentEdgeManager.currentTime();
3084      long blockBytesScanned =
3085        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
3086      metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned);
3087
3088      MutationType type = mutation.getMutateType();
3089      switch (type) {
3090        case PUT:
3091          metricsRegionServer.updateCheckAndPut(region, after - before);
3092          break;
3093        case DELETE:
3094          metricsRegionServer.updateCheckAndDelete(region, after - before);
3095          break;
3096        default:
3097          break;
3098      }
3099    }
3100    return result;
3101  }
3102
3103  // This is used to keep compatible with the old client implementation. Consider remove it if we
3104  // decide to drop the support of the client that still sends close request to a region scanner
3105  // which has already been exhausted.
3106  @Deprecated
3107  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3108
3109    private static final long serialVersionUID = -4305297078988180130L;
3110
3111    @Override
3112    public synchronized Throwable fillInStackTrace() {
3113      return this;
3114    }
3115  };
3116
3117  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3118    String scannerName = toScannerName(request.getScannerId());
3119    RegionScannerHolder rsh = this.scanners.get(scannerName);
3120    if (rsh == null) {
3121      // just ignore the next or close request if scanner does not exists.
3122      Long lastCallSeq = closedScanners.getIfPresent(scannerName);
3123      if (lastCallSeq != null) {
3124        // Check the sequence number to catch if the last call was incorrectly retried.
3125        // The only allowed scenario is when the scanner is exhausted and one more scan
3126        // request arrives - in this case returning 0 rows is correct.
3127        if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) {
3128          throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: "
3129            + (lastCallSeq + 1) + " But the nextCallSeq got from client: "
3130            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3131        } else {
3132          throw SCANNER_ALREADY_CLOSED;
3133        }
3134      } else {
3135        LOG.warn("Client tried to access missing scanner " + scannerName);
3136        throw new UnknownScannerException(
3137          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3138            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3139            + "long wait between consecutive client checkins, c) Server may be closing down, "
3140            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3141            + "possible fix would be increasing the value of"
3142            + "'hbase.client.scanner.timeout.period' configuration.");
3143      }
3144    }
3145    rejectIfInStandByState(rsh.r);
3146    RegionInfo hri = rsh.s.getRegionInfo();
3147    // Yes, should be the same instance
3148    if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3149      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3150        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3151      LOG.warn(msg + ", closing...");
3152      scanners.remove(scannerName);
3153      try {
3154        rsh.s.close();
3155      } catch (IOException e) {
3156        LOG.warn("Getting exception closing " + scannerName, e);
3157      } finally {
3158        try {
3159          server.getLeaseManager().cancelLease(scannerName);
3160        } catch (LeaseException e) {
3161          LOG.warn("Getting exception closing " + scannerName, e);
3162        }
3163      }
3164      throw new NotServingRegionException(msg);
3165    }
3166    return rsh;
3167  }
3168
3169  /**
3170   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3171   *         value.
3172   */
3173  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region,
3174    ScanResponse.Builder builder) throws IOException {
3175    rejectIfInStandByState(region);
3176    ClientProtos.Scan protoScan = request.getScan();
3177    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3178    Scan scan = ProtobufUtil.toScan(protoScan);
3179    // if the request doesn't set this, get the default region setting.
3180    if (!isLoadingCfsOnDemandSet) {
3181      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3182    }
3183
3184    if (!scan.hasFamilies()) {
3185      // Adding all families to scanner
3186      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3187        scan.addFamily(family);
3188      }
3189    }
3190    if (region.getCoprocessorHost() != null) {
3191      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3192      // wrapper for the core created RegionScanner
3193      region.getCoprocessorHost().preScannerOpen(scan);
3194    }
3195    RegionScannerImpl coreScanner = region.getScanner(scan);
3196    Shipper shipper = coreScanner;
3197    RegionScanner scanner = coreScanner;
3198    try {
3199      if (region.getCoprocessorHost() != null) {
3200        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3201      }
3202    } catch (Exception e) {
3203      // Although region coprocessor is for advanced users and they should take care of the
3204      // implementation to not damage the HBase system, closing the scanner on exception here does
3205      // not have any bad side effect, so let's do it
3206      scanner.close();
3207      throw e;
3208    }
3209    long scannerId = scannerIdGenerator.generateNewScannerId();
3210    builder.setScannerId(scannerId);
3211    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3212    builder.setTtl(scannerLeaseTimeoutPeriod);
3213    String scannerName = toScannerName(scannerId);
3214
3215    boolean fullRegionScan =
3216      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3217
3218    return new Pair<String, RegionScannerHolder>(scannerName,
3219      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3220  }
3221
3222  /**
3223   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3224   * this.scanners, the Map of outstanding scanners and their current state.
3225   * @param scannerId A scanner long id.
3226   * @return The long id as a String.
3227   */
3228  private static String toScannerName(long scannerId) {
3229    return Long.toString(scannerId);
3230  }
3231
3232  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3233    throws OutOfOrderScannerNextException {
3234    // if nextCallSeq does not match throw Exception straight away. This needs to be
3235    // performed even before checking of Lease.
3236    // See HBASE-5974
3237    if (request.hasNextCallSeq()) {
3238      long callSeq = request.getNextCallSeq();
3239      if (!rsh.incNextCallSeq(callSeq)) {
3240        throw new OutOfOrderScannerNextException(
3241          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3242            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3243      }
3244    }
3245  }
3246
3247  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3248    try {
3249      server.getLeaseManager().addLease(lease);
3250    } catch (LeaseStillHeldException e) {
3251      // should not happen as the scanner id is unique.
3252      throw new AssertionError(e);
3253    }
3254  }
3255
3256  // visible for testing only
3257  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3258    boolean allowHeartbeatMessages) {
3259    // Set the time limit to be half of the more restrictive timeout value (one of the
3260    // timeout values must be positive). In the event that both values are positive, the
3261    // more restrictive of the two is used to calculate the limit.
3262    if (allowHeartbeatMessages) {
3263      long now = EnvironmentEdgeManager.currentTime();
3264      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3265      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3266        long timeLimitDelta;
3267        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3268          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3269        } else {
3270          timeLimitDelta =
3271            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3272        }
3273
3274        // Use half of whichever timeout value was more restrictive... But don't allow
3275        // the time limit to be less than the allowable minimum (could cause an
3276        // immediate timeout before scanning any data).
3277        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3278        return now + timeLimitDelta;
3279      }
3280    }
3281    // Default value of timeLimit is negative to indicate no timeLimit should be
3282    // enforced.
3283    return -1L;
3284  }
3285
3286  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3287    long timeout;
3288    if (controller != null && controller.getCallTimeout() > 0) {
3289      timeout = controller.getCallTimeout();
3290    } else if (rpcTimeout > 0) {
3291      timeout = rpcTimeout;
3292    } else {
3293      return -1;
3294    }
3295    if (call != null) {
3296      timeout -= (now - call.getReceiveTime());
3297    }
3298    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3299    // return minimum value here in that case so we count this in calculating the final delta.
3300    return Math.max(minimumScanTimeLimitDelta, timeout);
3301  }
3302
3303  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3304    ScannerContext scannerContext, ScanResponse.Builder builder) {
3305    if (numOfCompleteRows >= limitOfRows) {
3306      if (LOG.isTraceEnabled()) {
3307        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3308          + " scannerContext: " + scannerContext);
3309      }
3310      builder.setMoreResults(false);
3311    }
3312  }
3313
3314  // return whether we have more results in region.
3315  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3316    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3317    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
3318    HRegion region = rsh.r;
3319    RegionScanner scanner = rsh.s;
3320    long maxResultSize;
3321    if (scanner.getMaxResultSize() > 0) {
3322      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3323    } else {
3324      maxResultSize = maxQuotaResultSize;
3325    }
3326    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3327    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3328    // arbitrary 32. TODO: keep record of general size of results being returned.
3329    ArrayList<Cell> values = new ArrayList<>(32);
3330    region.startRegionOperation(Operation.SCAN);
3331    long before = EnvironmentEdgeManager.currentTime();
3332    // Used to check if we've matched the row limit set on the Scan
3333    int numOfCompleteRows = 0;
3334    // Count of times we call nextRaw; can be > numOfCompleteRows.
3335    int numOfNextRawCalls = 0;
3336    try {
3337      int numOfResults = 0;
3338      synchronized (scanner) {
3339        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3340        boolean clientHandlesPartials =
3341          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3342        boolean clientHandlesHeartbeats =
3343          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3344
3345        // On the server side we must ensure that the correct ordering of partial results is
3346        // returned to the client to allow them to properly reconstruct the partial results.
3347        // If the coprocessor host is adding to the result list, we cannot guarantee the
3348        // correct ordering of partial results and so we prevent partial results from being
3349        // formed.
3350        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3351        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3352        boolean moreRows = false;
3353
3354        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3355        // certain time threshold on the server. When the time threshold is exceeded, the
3356        // server stops the scan and sends back whatever Results it has accumulated within
3357        // that time period (may be empty). Since heartbeat messages have the potential to
3358        // create partial Results (in the event that the timeout occurs in the middle of a
3359        // row), we must only generate heartbeat messages when the client can handle both
3360        // heartbeats AND partials
3361        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3362
3363        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3364
3365        final LimitScope sizeScope =
3366          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3367        final LimitScope timeScope =
3368          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3369
3370        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3371
3372        // Configure with limits for this RPC. Set keep progress true since size progress
3373        // towards size limit should be kept between calls to nextRaw
3374        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3375        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3376        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3377        // maxQuotaResultSize - max results just from server side configuration and quotas, without
3378        // user's specified max. We use this for evaluating limits based on blocks (not cells).
3379        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
3380        // cell or block size from maximum here so we adhere to total limits of request.
3381        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
3382        // have accumulated block bytes. If not, this will be 0 for block size.
3383        long maxCellSize = maxResultSize;
3384        long maxBlockSize = maxQuotaResultSize;
3385        if (rpcCall != null) {
3386          maxBlockSize -= rpcCall.getBlockBytesScanned();
3387          maxCellSize -= rpcCall.getResponseCellSize();
3388        }
3389
3390        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
3391        contextBuilder.setBatchLimit(scanner.getBatch());
3392        contextBuilder.setTimeLimit(timeScope, timeLimit);
3393        contextBuilder.setTrackMetrics(trackMetrics);
3394        ScannerContext scannerContext = contextBuilder.build();
3395        boolean limitReached = false;
3396        long blockBytesScannedBefore = 0;
3397        while (numOfResults < maxResults) {
3398          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3399          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3400          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3401          // reset the batch progress between nextRaw invocations since we don't want the
3402          // batch progress from previous calls to affect future calls
3403          scannerContext.setBatchProgress(0);
3404          assert values.isEmpty();
3405
3406          // Collect values to be returned here
3407          moreRows = scanner.nextRaw(values, scannerContext);
3408
3409          long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore;
3410          blockBytesScannedBefore = scannerContext.getBlockSizeProgress();
3411
3412          if (rpcCall == null) {
3413            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3414            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3415            // buffers.See more details in HBASE-26036.
3416            CellUtil.cloneIfNecessary(values);
3417          }
3418          numOfNextRawCalls++;
3419
3420          if (!values.isEmpty()) {
3421            if (limitOfRows > 0) {
3422              // First we need to check if the last result is partial and we have a row change. If
3423              // so then we need to increase the numOfCompleteRows.
3424              if (results.isEmpty()) {
3425                if (
3426                  rsh.rowOfLastPartialResult != null
3427                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3428                ) {
3429                  numOfCompleteRows++;
3430                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3431                    builder);
3432                }
3433              } else {
3434                Result lastResult = results.get(results.size() - 1);
3435                if (
3436                  lastResult.mayHaveMoreCellsInRow()
3437                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3438                ) {
3439                  numOfCompleteRows++;
3440                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3441                    builder);
3442                }
3443              }
3444              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3445                break;
3446              }
3447            }
3448            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3449            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3450
3451            if (request.getScan().getQueryMetricsEnabled()) {
3452              builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder()
3453                .setBlockBytesScanned(blockBytesScanned).build());
3454            }
3455
3456            results.add(r);
3457            numOfResults++;
3458            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3459              numOfCompleteRows++;
3460              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3461              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3462                break;
3463              }
3464            }
3465          } else if (!moreRows && !results.isEmpty()) {
3466            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3467            // last result is false. Otherwise it's possible that: the first nextRaw returned
3468            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3469            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3470            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3471            // be a contradictory state (HBASE-21206).
3472            int lastIdx = results.size() - 1;
3473            Result r = results.get(lastIdx);
3474            if (r.mayHaveMoreCellsInRow()) {
3475              results.set(lastIdx, ClientInternalHelper.createResult(
3476                ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false));
3477            }
3478          }
3479          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3480          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3481          boolean resultsLimitReached = numOfResults >= maxResults;
3482          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3483
3484          if (limitReached || !moreRows) {
3485            // With block size limit, we may exceed size limit without collecting any results.
3486            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3487            // or cursor if results were collected, for example for cell size or heap size limits.
3488            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
3489            // We only want to mark a ScanResponse as a heartbeat message in the event that
3490            // there are more values to be read server side. If there aren't more values,
3491            // marking it as a heartbeat is wasteful because the client will need to issue
3492            // another ScanRequest only to realize that they already have all the values
3493            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3494              // Heartbeat messages occur when the time limit has been reached, or size limit has
3495              // been reached before collecting any results. This can happen for heavily filtered
3496              // scans which scan over too many blocks.
3497              builder.setHeartbeatMessage(true);
3498              if (rsh.needCursor) {
3499                Cell cursorCell = scannerContext.getLastPeekedCell();
3500                if (cursorCell != null) {
3501                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3502                }
3503              }
3504            }
3505            break;
3506          }
3507          values.clear();
3508        }
3509        if (rpcCall != null) {
3510          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3511        }
3512        builder.setMoreResultsInRegion(moreRows);
3513        // Check to see if the client requested that we track metrics server side. If the
3514        // client requested metrics, retrieve the metrics from the scanner context.
3515        if (trackMetrics) {
3516          // rather than increment yet another counter in StoreScanner, just set the value here
3517          // from block size progress before writing into the response
3518          scannerContext.getMetrics().countOfBlockBytesScanned
3519            .set(scannerContext.getBlockSizeProgress());
3520          if (rpcCall != null) {
3521            scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
3522          }
3523          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3524          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3525          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3526
3527          for (Entry<String, Long> entry : metrics.entrySet()) {
3528            pairBuilder.setName(entry.getKey());
3529            pairBuilder.setValue(entry.getValue());
3530            metricBuilder.addMetrics(pairBuilder.build());
3531          }
3532
3533          builder.setScanMetrics(metricBuilder.build());
3534        }
3535      }
3536    } finally {
3537      region.closeRegionOperation();
3538      // Update serverside metrics, even on error.
3539      long end = EnvironmentEdgeManager.currentTime();
3540
3541      long responseCellSize = 0;
3542      long blockBytesScanned = 0;
3543      if (rpcCall != null) {
3544        responseCellSize = rpcCall.getResponseCellSize();
3545        blockBytesScanned = rpcCall.getBlockBytesScanned();
3546        rsh.updateBlockBytesScanned(blockBytesScanned);
3547      }
3548      region.getMetrics().updateScan();
3549      final MetricsRegionServer metricsRegionServer = server.getMetrics();
3550      if (metricsRegionServer != null) {
3551        metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
3552        metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
3553      }
3554    }
3555    // coprocessor postNext hook
3556    if (region.getCoprocessorHost() != null) {
3557      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3558    }
3559  }
3560
3561  /**
3562   * Scan data in a table.
3563   * @param controller the RPC controller
3564   * @param request    the scan request
3565   */
3566  @Override
3567  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3568    throws ServiceException {
3569    if (controller != null && !(controller instanceof HBaseRpcController)) {
3570      throw new UnsupportedOperationException(
3571        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3572    }
3573    if (!request.hasScannerId() && !request.hasScan()) {
3574      throw new ServiceException(
3575        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3576    }
3577    try {
3578      checkOpen();
3579    } catch (IOException e) {
3580      if (request.hasScannerId()) {
3581        String scannerName = toScannerName(request.getScannerId());
3582        if (LOG.isDebugEnabled()) {
3583          LOG.debug(
3584            "Server shutting down and client tried to access missing scanner " + scannerName);
3585        }
3586        final LeaseManager leaseManager = server.getLeaseManager();
3587        if (leaseManager != null) {
3588          try {
3589            leaseManager.cancelLease(scannerName);
3590          } catch (LeaseException le) {
3591            // No problem, ignore
3592            if (LOG.isTraceEnabled()) {
3593              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3594            }
3595          }
3596        }
3597      }
3598      throw new ServiceException(e);
3599    }
3600    requestCount.increment();
3601    rpcScanRequestCount.increment();
3602    RegionScannerContext rsx;
3603    ScanResponse.Builder builder = ScanResponse.newBuilder();
3604    try {
3605      rsx = checkQuotaAndGetRegionScannerContext(request, builder);
3606    } catch (IOException e) {
3607      if (e == SCANNER_ALREADY_CLOSED) {
3608        // Now we will close scanner automatically if there are no more results for this region but
3609        // the old client will still send a close request to us. Just ignore it and return.
3610        return builder.build();
3611      }
3612      throw new ServiceException(e);
3613    }
3614    String scannerName = rsx.scannerName;
3615    RegionScannerHolder rsh = rsx.holder;
3616    OperationQuota quota = rsx.quota;
3617    if (rsh.fullRegionScan) {
3618      rpcFullScanRequestCount.increment();
3619    }
3620    HRegion region = rsh.r;
3621    LeaseManager.Lease lease;
3622    try {
3623      // Remove lease while its being processed in server; protects against case
3624      // where processing of request takes > lease expiration time. or null if none found.
3625      lease = server.getLeaseManager().removeLease(scannerName);
3626    } catch (LeaseException e) {
3627      throw new ServiceException(e);
3628    }
3629    if (request.hasRenew() && request.getRenew()) {
3630      // add back and return
3631      addScannerLeaseBack(lease);
3632      try {
3633        checkScanNextCallSeq(request, rsh);
3634      } catch (OutOfOrderScannerNextException e) {
3635        throw new ServiceException(e);
3636      }
3637      return builder.build();
3638    }
3639    try {
3640      checkScanNextCallSeq(request, rsh);
3641    } catch (OutOfOrderScannerNextException e) {
3642      addScannerLeaseBack(lease);
3643      throw new ServiceException(e);
3644    }
3645    // Now we have increased the next call sequence. If we give client an error, the retry will
3646    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3647    // and then client will try to open a new scanner.
3648    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3649    int rows; // this is scan.getCaching
3650    if (request.hasNumberOfRows()) {
3651      rows = request.getNumberOfRows();
3652    } else {
3653      rows = closeScanner ? 0 : 1;
3654    }
3655    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3656    // now let's do the real scan.
3657    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
3658    RegionScanner scanner = rsh.s;
3659    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3660    // close the scanner.
3661    int limitOfRows;
3662    if (request.hasLimitOfRows()) {
3663      limitOfRows = request.getLimitOfRows();
3664    } else {
3665      limitOfRows = -1;
3666    }
3667    boolean scannerClosed = false;
3668    try {
3669      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3670      if (rows > 0) {
3671        boolean done = false;
3672        // Call coprocessor. Get region info from scanner.
3673        if (region.getCoprocessorHost() != null) {
3674          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3675          if (!results.isEmpty()) {
3676            for (Result r : results) {
3677              // add cell size from CP results so we can track response size and update limits
3678              // when calling scan below if !done. We'll also have tracked block size if the CP
3679              // got results from hbase, since StoreScanner tracks that for all calls automatically.
3680              addSize(rpcCall, r);
3681            }
3682          }
3683          if (bypass != null && bypass.booleanValue()) {
3684            done = true;
3685          }
3686        }
3687        if (!done) {
3688          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3689            results, builder, rpcCall);
3690        } else {
3691          builder.setMoreResultsInRegion(!results.isEmpty());
3692        }
3693      } else {
3694        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3695        builder.setMoreResultsInRegion(true);
3696      }
3697
3698      quota.addScanResult(results);
3699      addResults(builder, results, (HBaseRpcController) controller,
3700        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3701        isClientCellBlockSupport(rpcCall));
3702      if (scanner.isFilterDone() && results.isEmpty()) {
3703        // If the scanner's filter - if any - is done with the scan
3704        // only set moreResults to false if the results is empty. This is used to keep compatible
3705        // with the old scan implementation where we just ignore the returned results if moreResults
3706        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3707        builder.setMoreResults(false);
3708      }
3709      // Later we may close the scanner depending on this flag so here we need to make sure that we
3710      // have already set this flag.
3711      assert builder.hasMoreResultsInRegion();
3712      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3713      // yet.
3714      if (!builder.hasMoreResults()) {
3715        builder.setMoreResults(true);
3716      }
3717      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3718        // Record the last cell of the last result if it is a partial result
3719        // We need this to calculate the complete rows we have returned to client as the
3720        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3721        // current row. We may filter out all the remaining cells for the current row and just
3722        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3723        // check for row change.
3724        Result lastResult = results.get(results.size() - 1);
3725        if (lastResult.mayHaveMoreCellsInRow()) {
3726          rsh.rowOfLastPartialResult = lastResult.getRow();
3727        } else {
3728          rsh.rowOfLastPartialResult = null;
3729        }
3730      }
3731      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3732        scannerClosed = true;
3733        closeScanner(region, scanner, scannerName, rpcCall, false);
3734      }
3735
3736      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3737      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3738        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3739      }
3740
3741      return builder.build();
3742    } catch (IOException e) {
3743      try {
3744        // scanner is closed here
3745        scannerClosed = true;
3746        // The scanner state might be left in a dirty state, so we will tell the Client to
3747        // fail this RPC and close the scanner while opening up another one from the start of
3748        // row that the client has last seen.
3749        closeScanner(region, scanner, scannerName, rpcCall, true);
3750
3751        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3752        // used in two different semantics.
3753        // (1) The first is to close the client scanner and bubble up the exception all the way
3754        // to the application. This is preferred when the exception is really un-recoverable
3755        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3756        // bucket usually.
3757        // (2) Second semantics is to close the current region scanner only, but continue the
3758        // client scanner by overriding the exception. This is usually UnknownScannerException,
3759        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3760        // application-level ClientScanner has to continue without bubbling up the exception to
3761        // the client. See ClientScanner code to see how it deals with these special exceptions.
3762        if (e instanceof DoNotRetryIOException) {
3763          throw e;
3764        }
3765
3766        // If it is a FileNotFoundException, wrap as a
3767        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3768        if (e instanceof FileNotFoundException) {
3769          throw new DoNotRetryIOException(e);
3770        }
3771
3772        // We closed the scanner already. Instead of throwing the IOException, and client
3773        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3774        // a special exception to save an RPC.
3775        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3776          // 1.4.0+ clients know how to handle
3777          throw new ScannerResetException("Scanner is closed on the server-side", e);
3778        } else {
3779          // older clients do not know about SRE. Just throw USE, which they will handle
3780          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3781            + " scanner state for clients older than 1.3.", e);
3782        }
3783      } catch (IOException ioe) {
3784        throw new ServiceException(ioe);
3785      }
3786    } finally {
3787      if (!scannerClosed) {
3788        // Adding resets expiration time on lease.
3789        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3790        if (rpcCall != null) {
3791          rpcCall.setCallBack(rsh.shippedCallback);
3792        } else {
3793          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3794          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3795          // back to regionserver's LeaseManager in rsh.shippedCallback.
3796          runShippedCallback(rsh);
3797        }
3798      }
3799      quota.close();
3800    }
3801  }
3802
3803  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3804    assert rsh.shippedCallback != null;
3805    try {
3806      rsh.shippedCallback.run();
3807    } catch (IOException ioe) {
3808      throw new ServiceException(ioe);
3809    }
3810  }
3811
3812  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3813    RpcCallContext context, boolean isError) throws IOException {
3814    if (region.getCoprocessorHost() != null) {
3815      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3816        // bypass the actual close.
3817        return;
3818      }
3819    }
3820    RegionScannerHolder rsh = scanners.remove(scannerName);
3821    if (rsh != null) {
3822      if (context != null) {
3823        context.setCallBack(rsh.closeCallBack);
3824      } else {
3825        rsh.s.close();
3826      }
3827      if (region.getCoprocessorHost() != null) {
3828        region.getCoprocessorHost().postScannerClose(scanner);
3829      }
3830      if (!isError) {
3831        closedScanners.put(scannerName, rsh.getNextCallSeq());
3832      }
3833    }
3834  }
3835
3836  @Override
3837  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3838    CoprocessorServiceRequest request) throws ServiceException {
3839    rpcPreCheck("execRegionServerService");
3840    return server.execRegionServerService(controller, request);
3841  }
3842
3843  @Override
3844  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3845    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3846    try {
3847      final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager();
3848      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3849        GetSpaceQuotaSnapshotsResponse.newBuilder();
3850      if (manager != null) {
3851        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3852        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3853          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3854            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3855            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3856        }
3857      }
3858      return builder.build();
3859    } catch (Exception e) {
3860      throw new ServiceException(e);
3861    }
3862  }
3863
3864  @Override
3865  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3866    ClearRegionBlockCacheRequest request) throws ServiceException {
3867    try {
3868      rpcPreCheck("clearRegionBlockCache");
3869      ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3870      CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3871      server.getRegionServerCoprocessorHost().preClearRegionBlockCache();
3872      List<HRegion> regions = getRegions(request.getRegionList(), stats);
3873      for (HRegion region : regions) {
3874        try {
3875          stats = stats.append(this.server.clearRegionBlockCache(region));
3876        } catch (Exception e) {
3877          stats.addException(region.getRegionInfo().getRegionName(), e);
3878        }
3879      }
3880      stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3881      server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build());
3882      return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3883    } catch (IOException e) {
3884      throw new ServiceException(e);
3885    }
3886  }
3887
3888  private void executeOpenRegionProcedures(OpenRegionRequest request,
3889    Map<TableName, TableDescriptor> tdCache) {
3890    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3891    long initiatingMasterActiveTime =
3892      request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1;
3893    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3894      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3895      TableName tableName = regionInfo.getTable();
3896      TableDescriptor tableDesc = tdCache.get(tableName);
3897      if (tableDesc == null) {
3898        try {
3899          tableDesc = server.getTableDescriptors().get(regionInfo.getTable());
3900        } catch (IOException e) {
3901          // Here we do not fail the whole method since we also need deal with other
3902          // procedures, and we can not ignore this one, so we still schedule a
3903          // AssignRegionHandler and it will report back to master if we still can not get the
3904          // TableDescriptor.
3905          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3906            regionInfo.getTable(), e);
3907        }
3908        if (tableDesc != null) {
3909          tdCache.put(tableName, tableDesc);
3910        }
3911      }
3912      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3913        server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3914          regionOpenInfo.getFavoredNodesList());
3915      }
3916      long procId = regionOpenInfo.getOpenProcId();
3917      if (server.submitRegionProcedure(procId)) {
3918        server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId,
3919          tableDesc, masterSystemTime, initiatingMasterActiveTime));
3920      }
3921    }
3922  }
3923
3924  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3925    String encodedName;
3926    long initiatingMasterActiveTime =
3927      request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1;
3928    try {
3929      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3930    } catch (DoNotRetryIOException e) {
3931      throw new UncheckedIOException("Should not happen", e);
3932    }
3933    ServerName destination = request.hasDestinationServer()
3934      ? ProtobufUtil.toServerName(request.getDestinationServer())
3935      : null;
3936    long procId = request.getCloseProcId();
3937    boolean evictCache = request.getEvictCache();
3938    if (server.submitRegionProcedure(procId)) {
3939      server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId,
3940        false, destination, evictCache, initiatingMasterActiveTime));
3941    }
3942  }
3943
3944  private void executeProcedures(RemoteProcedureRequest request) {
3945    RSProcedureCallable callable;
3946    try {
3947      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3948        .getDeclaredConstructor().newInstance();
3949    } catch (Exception e) {
3950      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3951        request.getProcId(), e);
3952      server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(),
3953        e);
3954      return;
3955    }
3956    callable.init(request.getProcData().toByteArray(), server);
3957    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3958    server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable);
3959  }
3960
3961  @Override
3962  @QosPriority(priority = HConstants.ADMIN_QOS)
3963  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3964    ExecuteProceduresRequest request) throws ServiceException {
3965    try {
3966      checkOpen();
3967      throwOnWrongStartCode(request);
3968      server.getRegionServerCoprocessorHost().preExecuteProcedures();
3969      if (request.getOpenRegionCount() > 0) {
3970        // Avoid reading from the TableDescritor every time(usually it will read from the file
3971        // system)
3972        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3973        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3974      }
3975      if (request.getCloseRegionCount() > 0) {
3976        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3977      }
3978      if (request.getProcCount() > 0) {
3979        request.getProcList().forEach(this::executeProcedures);
3980      }
3981      server.getRegionServerCoprocessorHost().postExecuteProcedures();
3982      return ExecuteProceduresResponse.getDefaultInstance();
3983    } catch (IOException e) {
3984      throw new ServiceException(e);
3985    }
3986  }
3987
3988  @Override
3989  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
3990    GetAllBootstrapNodesRequest request) throws ServiceException {
3991    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
3992    server.getBootstrapNodes()
3993      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
3994    return builder.build();
3995  }
3996
3997  private void setReloadableGuardrails(Configuration conf) {
3998    rowSizeWarnThreshold =
3999      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
4000    rejectRowsWithSizeOverThreshold =
4001      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
4002    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
4003      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
4004  }
4005
4006  @Override
4007  public void onConfigurationChange(Configuration conf) {
4008    super.onConfigurationChange(conf);
4009    setReloadableGuardrails(conf);
4010  }
4011
4012  @Override
4013  public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
4014    GetCachedFilesListRequest request) throws ServiceException {
4015    GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder();
4016    List<String> fullyCachedFiles = new ArrayList<>();
4017    server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> {
4018      fullyCachedFiles.addAll(fcf.keySet());
4019    });
4020    return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
4021  }
4022
4023  RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request,
4024    ScanResponse.Builder builder) throws IOException {
4025    if (request.hasScannerId()) {
4026      // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
4027      // for more details.
4028      long scannerId = request.getScannerId();
4029      builder.setScannerId(scannerId);
4030      String scannerName = toScannerName(scannerId);
4031      RegionScannerHolder rsh = getRegionScanner(request);
4032      OperationQuota quota =
4033        getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize,
4034          rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
4035      return new RegionScannerContext(scannerName, rsh, quota);
4036    }
4037
4038    HRegion region = getRegion(request.getRegion());
4039    OperationQuota quota =
4040      getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L);
4041    Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder);
4042    return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota);
4043  }
4044}