001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.net.BindException;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.NavigableMap;
036import java.util.Set;
037import java.util.TreeSet;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.concurrent.atomic.LongAdder;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.CacheEvictionStats;
048import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellScanner;
051import org.apache.hadoop.hbase.CellUtil;
052import org.apache.hadoop.hbase.DoNotRetryIOException;
053import org.apache.hadoop.hbase.DroppedSnapshotException;
054import org.apache.hadoop.hbase.ExtendedCellScannable;
055import org.apache.hadoop.hbase.ExtendedCellScanner;
056import org.apache.hadoop.hbase.HBaseIOException;
057import org.apache.hadoop.hbase.HBaseRpcServicesBase;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.MultiActionResultTooLarge;
060import org.apache.hadoop.hbase.NotServingRegionException;
061import org.apache.hadoop.hbase.PrivateCellUtil;
062import org.apache.hadoop.hbase.RegionTooBusyException;
063import org.apache.hadoop.hbase.Server;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.UnknownScannerException;
067import org.apache.hadoop.hbase.client.Append;
068import org.apache.hadoop.hbase.client.CheckAndMutate;
069import org.apache.hadoop.hbase.client.CheckAndMutateResult;
070import org.apache.hadoop.hbase.client.ClientInternalHelper;
071import org.apache.hadoop.hbase.client.Delete;
072import org.apache.hadoop.hbase.client.Durability;
073import org.apache.hadoop.hbase.client.Get;
074import org.apache.hadoop.hbase.client.Increment;
075import org.apache.hadoop.hbase.client.Mutation;
076import org.apache.hadoop.hbase.client.OperationWithAttributes;
077import org.apache.hadoop.hbase.client.Put;
078import org.apache.hadoop.hbase.client.RegionInfo;
079import org.apache.hadoop.hbase.client.RegionReplicaUtil;
080import org.apache.hadoop.hbase.client.Result;
081import org.apache.hadoop.hbase.client.Row;
082import org.apache.hadoop.hbase.client.Scan;
083import org.apache.hadoop.hbase.client.TableDescriptor;
084import org.apache.hadoop.hbase.client.VersionInfoUtil;
085import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
086import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
087import org.apache.hadoop.hbase.exceptions.ScannerResetException;
088import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
090import org.apache.hadoop.hbase.io.ByteBuffAllocator;
091import org.apache.hadoop.hbase.io.hfile.BlockCache;
092import org.apache.hadoop.hbase.ipc.HBaseRpcController;
093import org.apache.hadoop.hbase.ipc.PriorityFunction;
094import org.apache.hadoop.hbase.ipc.QosPriority;
095import org.apache.hadoop.hbase.ipc.RpcCall;
096import org.apache.hadoop.hbase.ipc.RpcCallContext;
097import org.apache.hadoop.hbase.ipc.RpcCallback;
098import org.apache.hadoop.hbase.ipc.RpcServer;
099import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
100import org.apache.hadoop.hbase.ipc.RpcServerFactory;
101import org.apache.hadoop.hbase.ipc.RpcServerInterface;
102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
103import org.apache.hadoop.hbase.ipc.ServerRpcController;
104import org.apache.hadoop.hbase.net.Address;
105import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
106import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
107import org.apache.hadoop.hbase.quotas.OperationQuota;
108import org.apache.hadoop.hbase.quotas.QuotaUtil;
109import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
110import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
111import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
112import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
113import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
114import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
115import org.apache.hadoop.hbase.regionserver.Region.Operation;
116import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
117import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
118import org.apache.hadoop.hbase.regionserver.handler.AssignRegionHandler;
119import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
120import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
121import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
122import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
123import org.apache.hadoop.hbase.replication.ReplicationUtils;
124import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
125import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
126import org.apache.hadoop.hbase.security.Superusers;
127import org.apache.hadoop.hbase.security.access.Permission;
128import org.apache.hadoop.hbase.util.Bytes;
129import org.apache.hadoop.hbase.util.DNS;
130import org.apache.hadoop.hbase.util.DNS.ServerType;
131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
132import org.apache.hadoop.hbase.util.Pair;
133import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
134import org.apache.hadoop.hbase.wal.WAL;
135import org.apache.hadoop.hbase.wal.WALEdit;
136import org.apache.hadoop.hbase.wal.WALKey;
137import org.apache.hadoop.hbase.wal.WALSplitUtil;
138import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
139import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
140import org.apache.yetus.audience.InterfaceAudience;
141import org.slf4j.Logger;
142import org.slf4j.LoggerFactory;
143
144import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
145import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
146import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
147import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
148import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
149import org.apache.hbase.thirdparty.com.google.protobuf.Message;
150import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
151import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
152import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
153import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
154import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
155
156import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
157import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
158import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
247
248/**
249 * Implements the regionserver RPC services.
250 */
251@InterfaceAudience.Private
252public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
253  implements ClientService.BlockingInterface, BootstrapNodeService.BlockingInterface {
254
255  private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
256
257  /** RPC scheduler to use for the region server. */
258  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
259    "hbase.region.server.rpc.scheduler.factory.class";
260
261  /**
262   * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
263   * configuration exists to prevent the scenario where a time limit is specified to be so
264   * restrictive that the time limit is reached immediately (before any cells are scanned).
265   */
266  private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
267    "hbase.region.server.rpc.minimum.scan.time.limit.delta";
268  /**
269   * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
270   */
271  static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
272
273  /**
274   * Whether to reject rows with size > threshold defined by
275   * {@link HConstants#BATCH_ROWS_THRESHOLD_NAME}
276   */
277  private static final String REJECT_BATCH_ROWS_OVER_THRESHOLD =
278    "hbase.rpc.rows.size.threshold.reject";
279
280  /**
281   * Default value of config {@link RSRpcServices#REJECT_BATCH_ROWS_OVER_THRESHOLD}
282   */
283  private static final boolean DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD = false;
284
285  // Request counter. (Includes requests that are not serviced by regions.)
286  // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
287  final LongAdder requestCount = new LongAdder();
288
289  // Request counter for rpc get
290  final LongAdder rpcGetRequestCount = new LongAdder();
291
292  // Request counter for rpc scan
293  final LongAdder rpcScanRequestCount = new LongAdder();
294
295  // Request counter for scans that might end up in full scans
296  final LongAdder rpcFullScanRequestCount = new LongAdder();
297
298  // Request counter for rpc multi
299  final LongAdder rpcMultiRequestCount = new LongAdder();
300
301  // Request counter for rpc mutate
302  final LongAdder rpcMutateRequestCount = new LongAdder();
303
304  private volatile long maxScannerResultSize;
305
306  private ScannerIdGenerator scannerIdGenerator;
307  private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
308  // Hold the name and last sequence number of a closed scanner for a while. This is used
309  // to keep compatible for old clients which may send next or close request to a region
310  // scanner which has already been exhausted. The entries will be removed automatically
311  // after scannerLeaseTimeoutPeriod.
312  private final Cache<String, Long> closedScanners;
313  /**
314   * The lease timeout period for client scanners (milliseconds).
315   */
316  private final int scannerLeaseTimeoutPeriod;
317
318  /**
319   * The RPC timeout period (milliseconds)
320   */
321  private final int rpcTimeout;
322
323  /**
324   * The minimum allowable delta to use for the scan limit
325   */
326  private final long minimumScanTimeLimitDelta;
327
328  /**
329   * Row size threshold for multi requests above which a warning is logged
330   */
331  private volatile int rowSizeWarnThreshold;
332  /*
333   * Whether we should reject requests with very high no of rows i.e. beyond threshold defined by
334   * rowSizeWarnThreshold
335   */
336  private volatile boolean rejectRowsWithSizeOverThreshold;
337
338  final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false);
339
340  /**
341   * Services launched in RSRpcServices. By default they are on but you can use the below booleans
342   * to selectively enable/disable these services (Rare is the case where you would ever turn off
343   * one or the other).
344   */
345  public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
346    "hbase.regionserver.admin.executorService";
347  public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
348    "hbase.regionserver.client.executorService";
349  public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
350    "hbase.regionserver.client.meta.executorService";
351  public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG =
352    "hbase.regionserver.bootstrap.nodes.executorService";
353
354  /**
355   * An Rpc callback for closing a RegionScanner.
356   */
357  private static final class RegionScannerCloseCallBack implements RpcCallback {
358
359    private final RegionScanner scanner;
360
361    public RegionScannerCloseCallBack(RegionScanner scanner) {
362      this.scanner = scanner;
363    }
364
365    @Override
366    public void run() throws IOException {
367      this.scanner.close();
368    }
369  }
370
371  /**
372   * An Rpc callback for doing shipped() call on a RegionScanner.
373   */
374  private class RegionScannerShippedCallBack implements RpcCallback {
375    private final String scannerName;
376    private final Shipper shipper;
377    private final Lease lease;
378
379    public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) {
380      this.scannerName = scannerName;
381      this.shipper = shipper;
382      this.lease = lease;
383    }
384
385    @Override
386    public void run() throws IOException {
387      this.shipper.shipped();
388      // We're done. On way out re-add the above removed lease. The lease was temp removed for this
389      // Rpc call and we are at end of the call now. Time to add it back.
390      if (scanners.containsKey(scannerName)) {
391        if (lease != null) {
392          server.getLeaseManager().addLease(lease);
393        }
394      }
395    }
396  }
397
398  /**
399   * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
400   * completion of multiGets.
401   */
402  static class RegionScannersCloseCallBack implements RpcCallback {
403    private final List<RegionScanner> scanners = new ArrayList<>();
404
405    public void addScanner(RegionScanner scanner) {
406      this.scanners.add(scanner);
407    }
408
409    @Override
410    public void run() {
411      for (RegionScanner scanner : scanners) {
412        try {
413          scanner.close();
414        } catch (IOException e) {
415          LOG.error("Exception while closing the scanner " + scanner, e);
416        }
417      }
418    }
419  }
420
421  static class RegionScannerContext {
422    final String scannerName;
423    final RegionScannerHolder holder;
424    final OperationQuota quota;
425
426    RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) {
427      this.scannerName = scannerName;
428      this.holder = holder;
429      this.quota = quota;
430    }
431  }
432
433  /**
434   * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
435   */
436  static final class RegionScannerHolder {
437    private final AtomicLong nextCallSeq = new AtomicLong(0);
438    private final RegionScanner s;
439    private final HRegion r;
440    private final RpcCallback closeCallBack;
441    private final RpcCallback shippedCallback;
442    private byte[] rowOfLastPartialResult;
443    private boolean needCursor;
444    private boolean fullRegionScan;
445    private final String clientIPAndPort;
446    private final String userName;
447    private volatile long maxBlockBytesScanned = 0;
448    private volatile long prevBlockBytesScanned = 0;
449    private volatile long prevBlockBytesScannedDifference = 0;
450
451    RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack,
452      RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan,
453      String clientIPAndPort, String userName) {
454      this.s = s;
455      this.r = r;
456      this.closeCallBack = closeCallBack;
457      this.shippedCallback = shippedCallback;
458      this.needCursor = needCursor;
459      this.fullRegionScan = fullRegionScan;
460      this.clientIPAndPort = clientIPAndPort;
461      this.userName = userName;
462    }
463
464    long getNextCallSeq() {
465      return nextCallSeq.get();
466    }
467
468    boolean incNextCallSeq(long currentSeq) {
469      // Use CAS to prevent multiple scan request running on the same scanner.
470      return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
471    }
472
473    long getMaxBlockBytesScanned() {
474      return maxBlockBytesScanned;
475    }
476
477    long getPrevBlockBytesScannedDifference() {
478      return prevBlockBytesScannedDifference;
479    }
480
481    void updateBlockBytesScanned(long blockBytesScanned) {
482      prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned;
483      prevBlockBytesScanned = blockBytesScanned;
484      if (blockBytesScanned > maxBlockBytesScanned) {
485        maxBlockBytesScanned = blockBytesScanned;
486      }
487    }
488
489    // Should be called only when we need to print lease expired messages otherwise
490    // cache the String once made.
491    @Override
492    public String toString() {
493      return "clientIPAndPort=" + this.clientIPAndPort + ", userName=" + this.userName
494        + ", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
495    }
496  }
497
498  /**
499   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
500   */
501  private class ScannerListener implements LeaseListener {
502    private final String scannerName;
503
504    ScannerListener(final String n) {
505      this.scannerName = n;
506    }
507
508    @Override
509    public void leaseExpired() {
510      RegionScannerHolder rsh = scanners.remove(this.scannerName);
511      if (rsh == null) {
512        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
513        return;
514      }
515      LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
516      server.getMetrics().incrScannerLeaseExpired();
517      RegionScanner s = rsh.s;
518      HRegion region = null;
519      try {
520        region = server.getRegion(s.getRegionInfo().getRegionName());
521        if (region != null && region.getCoprocessorHost() != null) {
522          region.getCoprocessorHost().preScannerClose(s);
523        }
524      } catch (IOException e) {
525        LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
526      } finally {
527        try {
528          s.close();
529          if (region != null && region.getCoprocessorHost() != null) {
530            region.getCoprocessorHost().postScannerClose(s);
531          }
532        } catch (IOException e) {
533          LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
534        }
535      }
536    }
537  }
538
539  private static ResultOrException getResultOrException(final ClientProtos.Result r,
540    final int index) {
541    return getResultOrException(ResponseConverter.buildActionResult(r), index);
542  }
543
544  private static ResultOrException getResultOrException(final Exception e, final int index) {
545    return getResultOrException(ResponseConverter.buildActionResult(e), index);
546  }
547
548  private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
549    final int index) {
550    return builder.setIndex(index).build();
551  }
552
553  /**
554   * Checks for the following pre-checks in order:
555   * <ol>
556   * <li>RegionServer is running</li>
557   * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li>
558   * </ol>
559   * @param requestName name of rpc request. Used in reporting failures to provide context.
560   * @throws ServiceException If any of the above listed pre-check fails.
561   */
562  private void rpcPreCheck(String requestName) throws ServiceException {
563    try {
564      checkOpen();
565      requirePermission(requestName, Permission.Action.ADMIN);
566    } catch (IOException ioe) {
567      throw new ServiceException(ioe);
568    }
569  }
570
571  private boolean isClientCellBlockSupport(RpcCallContext context) {
572    return context != null && context.isClientCellBlockSupported();
573  }
574
575  private void addResult(final MutateResponse.Builder builder, final Result result,
576    final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
577    if (result == null) return;
578    if (clientCellBlockSupported) {
579      builder.setResult(ProtobufUtil.toResultNoData(result));
580      rpcc.setCellScanner(result.cellScanner());
581    } else {
582      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
583      builder.setResult(pbr);
584    }
585  }
586
587  private void addResults(ScanResponse.Builder builder, List<Result> results,
588    HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
589    builder.setStale(!isDefaultRegion);
590    if (results.isEmpty()) {
591      return;
592    }
593    if (clientCellBlockSupported) {
594      for (Result res : results) {
595        builder.addCellsPerResult(res.size());
596        builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
597      }
598      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(results));
599    } else {
600      for (Result res : results) {
601        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
602        builder.addResults(pbr);
603      }
604    }
605  }
606
607  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
608    CellScanner cellScanner, Condition condition, long nonceGroup,
609    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
610    int countOfCompleteMutation = 0;
611    try {
612      if (!region.getRegionInfo().isMetaRegion()) {
613        server.getMemStoreFlusher().reclaimMemStoreMemory();
614      }
615      List<Mutation> mutations = new ArrayList<>();
616      long nonce = HConstants.NO_NONCE;
617      for (ClientProtos.Action action : actions) {
618        if (action.hasGet()) {
619          throw new DoNotRetryIOException(
620            "Atomic put and/or delete only, not a Get=" + action.getGet());
621        }
622        MutationProto mutation = action.getMutation();
623        MutationType type = mutation.getMutateType();
624        switch (type) {
625          case PUT:
626            Put put = ProtobufUtil.toPut(mutation, cellScanner);
627            ++countOfCompleteMutation;
628            checkCellSizeLimit(region, put);
629            spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
630            mutations.add(put);
631            break;
632          case DELETE:
633            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
634            ++countOfCompleteMutation;
635            spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
636            mutations.add(del);
637            break;
638          case INCREMENT:
639            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
640            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
641            ++countOfCompleteMutation;
642            checkCellSizeLimit(region, increment);
643            spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
644            mutations.add(increment);
645            break;
646          case APPEND:
647            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
648            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
649            ++countOfCompleteMutation;
650            checkCellSizeLimit(region, append);
651            spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
652            mutations.add(append);
653            break;
654          default:
655            throw new DoNotRetryIOException("invalid mutation type : " + type);
656        }
657      }
658
659      if (mutations.size() == 0) {
660        return new CheckAndMutateResult(true, null);
661      } else {
662        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
663        CheckAndMutateResult result = null;
664        if (region.getCoprocessorHost() != null) {
665          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
666        }
667        if (result == null) {
668          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
669          if (region.getCoprocessorHost() != null) {
670            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
671          }
672        }
673        return result;
674      }
675    } finally {
676      // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
677      // even if the malformed cells are not skipped.
678      for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
679        skipCellsForMutation(actions.get(i), cellScanner);
680      }
681    }
682  }
683
684  /**
685   * Execute an append mutation.
686   * @return result to return to client if default operation should be bypassed as indicated by
687   *         RegionObserver, null otherwise
688   */
689  private Result append(final HRegion region, final OperationQuota quota,
690    final MutationProto mutation, final CellScanner cellScanner, long nonceGroup,
691    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
692    long before = EnvironmentEdgeManager.currentTime();
693    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
694    checkCellSizeLimit(region, append);
695    spaceQuota.getPolicyEnforcement(region).check(append);
696    quota.addMutation(append);
697    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
698    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
699    Result r = region.append(append, nonceGroup, nonce);
700    if (server.getMetrics() != null) {
701      long blockBytesScanned =
702        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
703      server.getMetrics().updateAppend(region, EnvironmentEdgeManager.currentTime() - before,
704        blockBytesScanned);
705    }
706    return r == null ? Result.EMPTY_RESULT : r;
707  }
708
709  /**
710   * Execute an increment mutation.
711   */
712  private Result increment(final HRegion region, final OperationQuota quota,
713    final MutationProto mutation, final CellScanner cells, long nonceGroup,
714    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
715    long before = EnvironmentEdgeManager.currentTime();
716    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
717    checkCellSizeLimit(region, increment);
718    spaceQuota.getPolicyEnforcement(region).check(increment);
719    quota.addMutation(increment);
720    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
721    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
722    Result r = region.increment(increment, nonceGroup, nonce);
723    final MetricsRegionServer metricsRegionServer = server.getMetrics();
724    if (metricsRegionServer != null) {
725      long blockBytesScanned =
726        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
727      metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before,
728        blockBytesScanned);
729    }
730    return r == null ? Result.EMPTY_RESULT : r;
731  }
732
733  /**
734   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
735   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
736   * @param cellsToReturn Could be null. May be allocated in this method. This is what this method
737   *                      returns as a 'result'.
738   * @param closeCallBack the callback to be used with multigets
739   * @param context       the current RpcCallContext
740   * @return Return the <code>cellScanner</code> passed
741   */
742  private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region,
743    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
744    final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn,
745    long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
746    ActivePolicyEnforcement spaceQuotaEnforcement) {
747    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
748    // one at a time, we instead pass them in batch. Be aware that the corresponding
749    // ResultOrException instance that matches each Put or Delete is then added down in the
750    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
751    // deferred/batched
752    List<ClientProtos.Action> mutations = null;
753    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
754    IOException sizeIOE = null;
755    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
756      ResultOrException.newBuilder();
757    boolean hasResultOrException = false;
758    for (ClientProtos.Action action : actions.getActionList()) {
759      hasResultOrException = false;
760      resultOrExceptionBuilder.clear();
761      try {
762        Result r = null;
763        long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
764        if (
765          context != null && context.isRetryImmediatelySupported()
766            && (context.getResponseCellSize() > maxQuotaResultSize
767              || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize)
768        ) {
769
770          // We're storing the exception since the exception and reason string won't
771          // change after the response size limit is reached.
772          if (sizeIOE == null) {
773            // We don't need the stack un-winding do don't throw the exception.
774            // Throwing will kill the JVM's JIT.
775            //
776            // Instead just create the exception and then store it.
777            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
778              + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore);
779
780            // Only report the exception once since there's only one request that
781            // caused the exception. Otherwise this number will dominate the exceptions count.
782            rpcServer.getMetrics().exception(sizeIOE);
783          }
784
785          // Now that there's an exception is known to be created
786          // use it for the response.
787          //
788          // This will create a copy in the builder.
789          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
790          resultOrExceptionBuilder.setException(pair);
791          context.incrementResponseExceptionSize(pair.getSerializedSize());
792          resultOrExceptionBuilder.setIndex(action.getIndex());
793          builder.addResultOrException(resultOrExceptionBuilder.build());
794          skipCellsForMutation(action, cellScanner);
795          continue;
796        }
797        if (action.hasGet()) {
798          long before = EnvironmentEdgeManager.currentTime();
799          ClientProtos.Get pbGet = action.getGet();
800          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
801          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
802          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
803          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
804          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
805            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
806              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
807              + "reverse Scan.");
808          }
809          try {
810            Get get = ProtobufUtil.toGet(pbGet);
811            if (context != null) {
812              r = get(get, (region), closeCallBack, context);
813            } else {
814              r = region.get(get);
815            }
816          } finally {
817            final MetricsRegionServer metricsRegionServer = server.getMetrics();
818            if (metricsRegionServer != null) {
819              long blockBytesScanned =
820                context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
821              metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
822                blockBytesScanned);
823            }
824          }
825        } else if (action.hasServiceCall()) {
826          hasResultOrException = true;
827          Message result = execServiceOnRegion(region, action.getServiceCall());
828          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
829            ClientProtos.CoprocessorServiceResult.newBuilder();
830          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
831            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
832              // TODO: Copy!!!
833              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
834        } else if (action.hasMutation()) {
835          MutationType type = action.getMutation().getMutateType();
836          if (
837            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
838              && !mutations.isEmpty()
839          ) {
840            // Flush out any Puts or Deletes already collected.
841            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
842              spaceQuotaEnforcement);
843            mutations.clear();
844          }
845          switch (type) {
846            case APPEND:
847              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
848                spaceQuotaEnforcement, context);
849              break;
850            case INCREMENT:
851              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
852                spaceQuotaEnforcement, context);
853              break;
854            case PUT:
855            case DELETE:
856              // Collect the individual mutations and apply in a batch
857              if (mutations == null) {
858                mutations = new ArrayList<>(actions.getActionCount());
859              }
860              mutations.add(action);
861              break;
862            default:
863              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
864          }
865        } else {
866          throw new HBaseIOException("Unexpected Action type");
867        }
868        if (r != null) {
869          ClientProtos.Result pbResult = null;
870          if (isClientCellBlockSupport(context)) {
871            pbResult = ProtobufUtil.toResultNoData(r);
872            // Hard to guess the size here. Just make a rough guess.
873            if (cellsToReturn == null) {
874              cellsToReturn = new ArrayList<>();
875            }
876            cellsToReturn.add(r);
877          } else {
878            pbResult = ProtobufUtil.toResult(r);
879          }
880          addSize(context, r);
881          hasResultOrException = true;
882          resultOrExceptionBuilder.setResult(pbResult);
883        }
884        // Could get to here and there was no result and no exception. Presumes we added
885        // a Put or Delete to the collecting Mutations List for adding later. In this
886        // case the corresponding ResultOrException instance for the Put or Delete will be added
887        // down in the doNonAtomicBatchOp method call rather than up here.
888      } catch (IOException ie) {
889        rpcServer.getMetrics().exception(ie);
890        hasResultOrException = true;
891        NameBytesPair pair = ResponseConverter.buildException(ie);
892        resultOrExceptionBuilder.setException(pair);
893        context.incrementResponseExceptionSize(pair.getSerializedSize());
894      }
895      if (hasResultOrException) {
896        // Propagate index.
897        resultOrExceptionBuilder.setIndex(action.getIndex());
898        builder.addResultOrException(resultOrExceptionBuilder.build());
899      }
900    }
901    // Finish up any outstanding mutations
902    if (!CollectionUtils.isEmpty(mutations)) {
903      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
904    }
905    return cellsToReturn;
906  }
907
908  private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException {
909    if (r.maxCellSize > 0) {
910      CellScanner cells = m.cellScanner();
911      while (cells.advance()) {
912        int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current());
913        if (size > r.maxCellSize) {
914          String msg = "Cell[" + cells.current() + "] with size " + size + " exceeds limit of "
915            + r.maxCellSize + " bytes";
916          LOG.debug(msg);
917          throw new DoNotRetryIOException(msg);
918        }
919      }
920    }
921  }
922
923  private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
924    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
925    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
926    // Just throw the exception. The exception will be caught and then added to region-level
927    // exception for RegionAction. Leaving the null to action result is ok since the null
928    // result is viewed as failure by hbase client. And the region-lever exception will be used
929    // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
930    // AsyncBatchRpcRetryingCaller#onComplete for more details.
931    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
932  }
933
934  private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
935    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
936    ActivePolicyEnforcement spaceQuotaEnforcement) {
937    try {
938      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
939        spaceQuotaEnforcement, false);
940    } catch (IOException e) {
941      // Set the exception for each action. The mutations in same RegionAction are group to
942      // different batch and then be processed individually. Hence, we don't set the region-level
943      // exception here for whole RegionAction.
944      for (Action mutation : mutations) {
945        builder.addResultOrException(getResultOrException(e, mutation.getIndex()));
946      }
947    }
948  }
949
950  /**
951   * Execute a list of mutations.
952   */
953  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
954    final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells,
955    long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
956    throws IOException {
957    Mutation[] mArray = new Mutation[mutations.size()];
958    long before = EnvironmentEdgeManager.currentTime();
959    boolean batchContainsPuts = false, batchContainsDelete = false;
960    try {
961      /**
962       * HBASE-17924 mutationActionMap is a map to map the relation between mutations and actions
963       * since mutation array may have been reoredered.In order to return the right result or
964       * exception to the corresponding actions, We need to know which action is the mutation belong
965       * to. We can't sort ClientProtos.Action array, since they are bonded to cellscanners.
966       */
967      Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
968      int i = 0;
969      long nonce = HConstants.NO_NONCE;
970      for (ClientProtos.Action action : mutations) {
971        if (action.hasGet()) {
972          throw new DoNotRetryIOException(
973            "Atomic put and/or delete only, not a Get=" + action.getGet());
974        }
975        MutationProto m = action.getMutation();
976        Mutation mutation;
977        switch (m.getMutateType()) {
978          case PUT:
979            mutation = ProtobufUtil.toPut(m, cells);
980            batchContainsPuts = true;
981            break;
982
983          case DELETE:
984            mutation = ProtobufUtil.toDelete(m, cells);
985            batchContainsDelete = true;
986            break;
987
988          case INCREMENT:
989            mutation = ProtobufUtil.toIncrement(m, cells);
990            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
991            break;
992
993          case APPEND:
994            mutation = ProtobufUtil.toAppend(m, cells);
995            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
996            break;
997
998          default:
999            throw new DoNotRetryIOException("Invalid mutation type : " + m.getMutateType());
1000        }
1001        mutationActionMap.put(mutation, action);
1002        mArray[i++] = mutation;
1003        checkCellSizeLimit(region, mutation);
1004        // Check if a space quota disallows this mutation
1005        spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
1006        quota.addMutation(mutation);
1007      }
1008
1009      if (!region.getRegionInfo().isMetaRegion()) {
1010        server.getMemStoreFlusher().reclaimMemStoreMemory();
1011      }
1012
1013      // HBASE-17924
1014      // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
1015      // order is preserved as its expected from the client
1016      if (!atomic) {
1017        Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
1018      }
1019
1020      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
1021
1022      // When atomic is true, it indicates that the mutateRow API or the batch API with
1023      // RowMutations is called. In this case, we need to merge the results of the
1024      // Increment/Append operations if the mutations include those operations, and set the merged
1025      // result to the first element of the ResultOrException list
1026      if (atomic) {
1027        List<ResultOrException> resultOrExceptions = new ArrayList<>();
1028        List<Result> results = new ArrayList<>();
1029        for (i = 0; i < codes.length; i++) {
1030          if (codes[i].getResult() != null) {
1031            results.add(codes[i].getResult());
1032          }
1033          if (i != 0) {
1034            resultOrExceptions
1035              .add(getResultOrException(ClientProtos.Result.getDefaultInstance(), i));
1036          }
1037        }
1038
1039        if (results.isEmpty()) {
1040          builder.addResultOrException(
1041            getResultOrException(ClientProtos.Result.getDefaultInstance(), 0));
1042        } else {
1043          // Merge the results of the Increment/Append operations
1044          List<Cell> cellList = new ArrayList<>();
1045          for (Result result : results) {
1046            if (result.rawCells() != null) {
1047              cellList.addAll(Arrays.asList(result.rawCells()));
1048            }
1049          }
1050          Result result = Result.create(cellList);
1051
1052          // Set the merged result of the Increment/Append operations to the first element of the
1053          // ResultOrException list
1054          builder.addResultOrException(getResultOrException(ProtobufUtil.toResult(result), 0));
1055        }
1056
1057        builder.addAllResultOrException(resultOrExceptions);
1058        return;
1059      }
1060
1061      for (i = 0; i < codes.length; i++) {
1062        Mutation currentMutation = mArray[i];
1063        ClientProtos.Action currentAction = mutationActionMap.get(currentMutation);
1064        int index = currentAction.hasIndex() ? currentAction.getIndex() : i;
1065        Exception e;
1066        switch (codes[i].getOperationStatusCode()) {
1067          case BAD_FAMILY:
1068            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
1069            builder.addResultOrException(getResultOrException(e, index));
1070            break;
1071
1072          case SANITY_CHECK_FAILURE:
1073            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
1074            builder.addResultOrException(getResultOrException(e, index));
1075            break;
1076
1077          default:
1078            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
1079            builder.addResultOrException(getResultOrException(e, index));
1080            break;
1081
1082          case SUCCESS:
1083            ClientProtos.Result result = codes[i].getResult() == null
1084              ? ClientProtos.Result.getDefaultInstance()
1085              : ProtobufUtil.toResult(codes[i].getResult());
1086            builder.addResultOrException(getResultOrException(result, index));
1087            break;
1088
1089          case STORE_TOO_BUSY:
1090            e = new RegionTooBusyException(codes[i].getExceptionMsg());
1091            builder.addResultOrException(getResultOrException(e, index));
1092            break;
1093        }
1094      }
1095    } finally {
1096      int processedMutationIndex = 0;
1097      for (Action mutation : mutations) {
1098        // The non-null mArray[i] means the cell scanner has been read.
1099        if (mArray[processedMutationIndex++] == null) {
1100          skipCellsForMutation(mutation, cells);
1101        }
1102      }
1103      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1104    }
1105  }
1106
1107  private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
1108    boolean batchContainsDelete) {
1109    final MetricsRegionServer metricsRegionServer = server.getMetrics();
1110    if (metricsRegionServer != null) {
1111      long after = EnvironmentEdgeManager.currentTime();
1112      if (batchContainsPuts) {
1113        metricsRegionServer.updatePutBatch(region, after - starttime);
1114      }
1115      if (batchContainsDelete) {
1116        metricsRegionServer.updateDeleteBatch(region, after - starttime);
1117      }
1118    }
1119  }
1120
1121  /**
1122   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
1123   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
1124   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
1125   *         exceptionMessage if any
1126   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
1127   *             edits for secondary replicas any more, see
1128   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
1129   */
1130  @Deprecated
1131  private OperationStatus[] doReplayBatchOp(final HRegion region,
1132    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
1133    long before = EnvironmentEdgeManager.currentTime();
1134    boolean batchContainsPuts = false, batchContainsDelete = false;
1135    try {
1136      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
1137        MutationReplay m = it.next();
1138
1139        if (m.getType() == MutationType.PUT) {
1140          batchContainsPuts = true;
1141        } else {
1142          batchContainsDelete = true;
1143        }
1144
1145        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
1146        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
1147        if (metaCells != null && !metaCells.isEmpty()) {
1148          for (Cell metaCell : metaCells) {
1149            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
1150            boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1151            HRegion hRegion = region;
1152            if (compactionDesc != null) {
1153              // replay the compaction. Remove the files from stores only if we are the primary
1154              // region replica (thus own the files)
1155              hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
1156                replaySeqId);
1157              continue;
1158            }
1159            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
1160            if (flushDesc != null && !isDefaultReplica) {
1161              hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
1162              continue;
1163            }
1164            RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
1165            if (regionEvent != null && !isDefaultReplica) {
1166              hRegion.replayWALRegionEventMarker(regionEvent);
1167              continue;
1168            }
1169            BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
1170            if (bulkLoadEvent != null) {
1171              hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
1172              continue;
1173            }
1174          }
1175          it.remove();
1176        }
1177      }
1178      requestCount.increment();
1179      if (!region.getRegionInfo().isMetaRegion()) {
1180        server.getMemStoreFlusher().reclaimMemStoreMemory();
1181      }
1182      return region.batchReplay(mutations.toArray(new MutationReplay[mutations.size()]),
1183        replaySeqId);
1184    } finally {
1185      updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
1186    }
1187  }
1188
1189  private void closeAllScanners() {
1190    // Close any outstanding scanners. Means they'll get an UnknownScanner
1191    // exception next time they come in.
1192    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
1193      try {
1194        e.getValue().s.close();
1195      } catch (IOException ioe) {
1196        LOG.warn("Closing scanner " + e.getKey(), ioe);
1197      }
1198    }
1199  }
1200
1201  // Directly invoked only for testing
1202  public RSRpcServices(final HRegionServer rs) throws IOException {
1203    super(rs, rs.getProcessName());
1204    final Configuration conf = rs.getConfiguration();
1205    setReloadableGuardrails(conf);
1206    scannerLeaseTimeoutPeriod = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
1207      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
1208    rpcTimeout =
1209      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
1210    minimumScanTimeLimitDelta = conf.getLong(REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
1211      DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
1212    rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
1213    closedScanners = CacheBuilder.newBuilder()
1214      .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
1215  }
1216
1217  @Override
1218  protected boolean defaultReservoirEnabled() {
1219    return true;
1220  }
1221
1222  @Override
1223  protected ServerType getDNSServerType() {
1224    return DNS.ServerType.REGIONSERVER;
1225  }
1226
1227  @Override
1228  protected String getHostname(Configuration conf, String defaultHostname) {
1229    return conf.get("hbase.regionserver.ipc.address", defaultHostname);
1230  }
1231
1232  @Override
1233  protected String getPortConfigName() {
1234    return HConstants.REGIONSERVER_PORT;
1235  }
1236
1237  @Override
1238  protected int getDefaultPort() {
1239    return HConstants.DEFAULT_REGIONSERVER_PORT;
1240  }
1241
1242  @Override
1243  protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
1244    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1245      SimpleRpcSchedulerFactory.class);
1246  }
1247
1248  protected RpcServerInterface createRpcServer(final Server server,
1249    final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
1250    final String name) throws IOException {
1251    final Configuration conf = server.getConfiguration();
1252    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
1253    try {
1254      return RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, // use final
1255                                                                                        // bindAddress
1256                                                                                        // for this
1257                                                                                        // server.
1258        conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
1259    } catch (BindException be) {
1260      throw new IOException(be.getMessage() + ". To switch ports use the '"
1261        + HConstants.REGIONSERVER_PORT + "' configuration property.",
1262        be.getCause() != null ? be.getCause() : be);
1263    }
1264  }
1265
1266  protected Class<?> getRpcSchedulerFactoryClass() {
1267    final Configuration conf = server.getConfiguration();
1268    return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
1269      SimpleRpcSchedulerFactory.class);
1270  }
1271
1272  protected PriorityFunction createPriority() {
1273    return new RSAnnotationReadingPriorityFunction(this);
1274  }
1275
1276  public int getScannersCount() {
1277    return scanners.size();
1278  }
1279
1280  /** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
1281  RegionScanner getScanner(long scannerId) {
1282    RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
1283    return rsh == null ? null : rsh.s;
1284  }
1285
1286  /** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1287  private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) {
1288    return scanners.get(toScannerName(scannerId));
1289  }
1290
1291  public String getScanDetailsWithId(long scannerId) {
1292    RegionScanner scanner = getScanner(scannerId);
1293    if (scanner == null) {
1294      return null;
1295    }
1296    StringBuilder builder = new StringBuilder();
1297    builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
1298    builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
1299    builder.append(" operation_id: ").append(scanner.getOperationId());
1300    return builder.toString();
1301  }
1302
1303  public String getScanDetailsWithRequest(ScanRequest request) {
1304    try {
1305      if (!request.hasRegion()) {
1306        return null;
1307      }
1308      Region region = getRegion(request.getRegion());
1309      StringBuilder builder = new StringBuilder();
1310      builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
1311      builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
1312      for (NameBytesPair pair : request.getScan().getAttributeList()) {
1313        if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
1314          builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
1315          break;
1316        }
1317      }
1318      return builder.toString();
1319    } catch (IOException ignored) {
1320      return null;
1321    }
1322  }
1323
1324  /**
1325   * Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
1326   */
1327  long getScannerVirtualTime(long scannerId) {
1328    RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
1329    return rsh == null ? 0L : rsh.getNextCallSeq();
1330  }
1331
1332  /**
1333   * Method to account for the size of retained cells.
1334   * @param context rpc call context
1335   * @param r       result to add size.
1336   * @return an object that represents the last referenced block from this response.
1337   */
1338  void addSize(RpcCallContext context, Result r) {
1339    if (context != null && r != null && !r.isEmpty()) {
1340      for (Cell c : r.rawCells()) {
1341        context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));
1342      }
1343    }
1344  }
1345
1346  /** Returns Remote client's ip and port else null if can't be determined. */
1347  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1348      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1349  static String getRemoteClientIpAndPort() {
1350    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1351    if (rpcCall == null) {
1352      return HConstants.EMPTY_STRING;
1353    }
1354    InetAddress address = rpcCall.getRemoteAddress();
1355    if (address == null) {
1356      return HConstants.EMPTY_STRING;
1357    }
1358    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1359    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1360    // scanning than a hostname anyways.
1361    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
1362  }
1363
1364  /** Returns Remote client's username. */
1365  @RestrictedApi(explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
1366      link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
1367  static String getUserName() {
1368    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
1369    if (rpcCall == null) {
1370      return HConstants.EMPTY_STRING;
1371    }
1372    return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
1373  }
1374
1375  private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
1376    HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
1377    Lease lease = server.getLeaseManager().createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1378      new ScannerListener(scannerName));
1379    RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
1380    RpcCallback closeCallback =
1381      s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s);
1382    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
1383      needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
1384    RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
1385    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! "
1386      + scannerName + ", " + existing;
1387    return rsh;
1388  }
1389
1390  private boolean isFullRegionScan(Scan scan, HRegion region) {
1391    // If the scan start row equals or less than the start key of the region
1392    // and stop row greater than equals end key (if stop row present)
1393    // or if the stop row is empty
1394    // account this as a full region scan
1395    if (
1396      Bytes.compareTo(scan.getStartRow(), region.getRegionInfo().getStartKey()) <= 0
1397        && (Bytes.compareTo(scan.getStopRow(), region.getRegionInfo().getEndKey()) >= 0
1398          && !Bytes.equals(region.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW)
1399          || Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1400    ) {
1401      return true;
1402    }
1403    return false;
1404  }
1405
1406  /**
1407   * Find the HRegion based on a region specifier
1408   * @param regionSpecifier the region specifier
1409   * @return the corresponding region
1410   * @throws IOException if the specifier is not null, but failed to find the region
1411   */
1412  public HRegion getRegion(final RegionSpecifier regionSpecifier) throws IOException {
1413    return server.getRegion(regionSpecifier.getValue().toByteArray());
1414  }
1415
1416  /**
1417   * Find the List of HRegions based on a list of region specifiers
1418   * @param regionSpecifiers the list of region specifiers
1419   * @return the corresponding list of regions
1420   * @throws IOException if any of the specifiers is not null, but failed to find the region
1421   */
1422  private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers,
1423    final CacheEvictionStatsBuilder stats) {
1424    List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size());
1425    for (RegionSpecifier regionSpecifier : regionSpecifiers) {
1426      try {
1427        regions.add(server.getRegion(regionSpecifier.getValue().toByteArray()));
1428      } catch (NotServingRegionException e) {
1429        stats.addException(regionSpecifier.getValue().toByteArray(), e);
1430      }
1431    }
1432    return regions;
1433  }
1434
1435  PriorityFunction getPriority() {
1436    return priority;
1437  }
1438
1439  private RegionServerRpcQuotaManager getRpcQuotaManager() {
1440    return server.getRegionServerRpcQuotaManager();
1441  }
1442
1443  private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1444    return server.getRegionServerSpaceQuotaManager();
1445  }
1446
1447  void start(ZKWatcher zkWatcher) {
1448    this.scannerIdGenerator = new ScannerIdGenerator(this.server.getServerName());
1449    internalStart(zkWatcher);
1450  }
1451
1452  void stop() {
1453    closeAllScanners();
1454    internalStop();
1455  }
1456
1457  /**
1458   * Called to verify that this server is up and running.
1459   */
1460  // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name).
1461  protected void checkOpen() throws IOException {
1462    if (server.isAborted()) {
1463      throw new RegionServerAbortedException("Server " + server.getServerName() + " aborting");
1464    }
1465    if (server.isStopped()) {
1466      throw new RegionServerStoppedException("Server " + server.getServerName() + " stopping");
1467    }
1468    if (!server.isDataFileSystemOk()) {
1469      throw new RegionServerStoppedException("File system not available");
1470    }
1471    if (!server.isOnline()) {
1472      throw new ServerNotRunningYetException(
1473        "Server " + server.getServerName() + " is not running yet");
1474    }
1475  }
1476
1477  /**
1478   * By default, put up an Admin and a Client Service. Set booleans
1479   * <code>hbase.regionserver.admin.executorService</code> and
1480   * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services.
1481   * Default is that both are enabled.
1482   * @return immutable list of blocking services and the security info classes that this server
1483   *         supports
1484   */
1485  protected List<BlockingServiceAndInterface> getServices() {
1486    boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
1487    boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
1488    boolean clientMeta =
1489      getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
1490    boolean bootstrapNodes =
1491      getConfiguration().getBoolean(REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG, true);
1492    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
1493    if (client) {
1494      bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
1495        ClientService.BlockingInterface.class));
1496    }
1497    if (admin) {
1498      bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
1499        AdminService.BlockingInterface.class));
1500    }
1501    if (clientMeta) {
1502      bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
1503        ClientMetaService.BlockingInterface.class));
1504    }
1505    if (bootstrapNodes) {
1506      bssi.add(
1507        new BlockingServiceAndInterface(BootstrapNodeService.newReflectiveBlockingService(this),
1508          BootstrapNodeService.BlockingInterface.class));
1509    }
1510    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
1511  }
1512
1513  /**
1514   * Close a region on the region server.
1515   * @param controller the RPC controller
1516   * @param request    the request
1517   */
1518  @Override
1519  @QosPriority(priority = HConstants.ADMIN_QOS)
1520  public CloseRegionResponse closeRegion(final RpcController controller,
1521    final CloseRegionRequest request) throws ServiceException {
1522    final ServerName sn = (request.hasDestinationServer()
1523      ? ProtobufUtil.toServerName(request.getDestinationServer())
1524      : null);
1525
1526    try {
1527      checkOpen();
1528      throwOnWrongStartCode(request);
1529      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1530
1531      requestCount.increment();
1532      if (sn == null) {
1533        LOG.info("Close " + encodedRegionName + " without moving");
1534      } else {
1535        LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1536      }
1537      boolean closed = server.closeRegion(encodedRegionName, false, sn);
1538      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1539      return builder.build();
1540    } catch (IOException ie) {
1541      throw new ServiceException(ie);
1542    }
1543  }
1544
1545  /**
1546   * Compact a region on the region server.
1547   * @param controller the RPC controller
1548   * @param request    the request
1549   */
1550  @Override
1551  @QosPriority(priority = HConstants.ADMIN_QOS)
1552  public CompactRegionResponse compactRegion(final RpcController controller,
1553    final CompactRegionRequest request) throws ServiceException {
1554    try {
1555      checkOpen();
1556      requestCount.increment();
1557      HRegion region = getRegion(request.getRegion());
1558      // Quota support is enabled, the requesting user is not system/super user
1559      // and a quota policy is enforced that disables compactions.
1560      if (
1561        QuotaUtil.isQuotaEnabled(getConfiguration())
1562          && !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null))
1563          && this.server.getRegionServerSpaceQuotaManager()
1564            .areCompactionsDisabled(region.getTableDescriptor().getTableName())
1565      ) {
1566        throw new DoNotRetryIOException(
1567          "Compactions on this region are " + "disabled due to a space quota violation.");
1568      }
1569      region.startRegionOperation(Operation.COMPACT_REGION);
1570      LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1571      boolean major = request.hasMajor() && request.getMajor();
1572      if (request.hasFamily()) {
1573        byte[] family = request.getFamily().toByteArray();
1574        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1575          + region.getRegionInfo().getRegionNameAsString() + " and family "
1576          + Bytes.toString(family);
1577        LOG.trace(log);
1578        region.requestCompaction(family, log, Store.PRIORITY_USER, major,
1579          CompactionLifeCycleTracker.DUMMY);
1580      } else {
1581        String log = "User-triggered " + (major ? "major " : "") + "compaction for region "
1582          + region.getRegionInfo().getRegionNameAsString();
1583        LOG.trace(log);
1584        region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
1585      }
1586      return CompactRegionResponse.newBuilder().build();
1587    } catch (IOException ie) {
1588      throw new ServiceException(ie);
1589    }
1590  }
1591
1592  @Override
1593  @QosPriority(priority = HConstants.ADMIN_QOS)
1594  public CompactionSwitchResponse compactionSwitch(RpcController controller,
1595    CompactionSwitchRequest request) throws ServiceException {
1596    rpcPreCheck("compactionSwitch");
1597    final CompactSplit compactSplitThread = server.getCompactSplitThread();
1598    requestCount.increment();
1599    boolean prevState = compactSplitThread.isCompactionsEnabled();
1600    CompactionSwitchResponse response =
1601      CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
1602    if (prevState == request.getEnabled()) {
1603      // passed in requested state is same as current state. No action required
1604      return response;
1605    }
1606    compactSplitThread.switchCompaction(request.getEnabled());
1607    return response;
1608  }
1609
1610  /**
1611   * Flush a region on the region server.
1612   * @param controller the RPC controller
1613   * @param request    the request
1614   */
1615  @Override
1616  @QosPriority(priority = HConstants.ADMIN_QOS)
1617  public FlushRegionResponse flushRegion(final RpcController controller,
1618    final FlushRegionRequest request) throws ServiceException {
1619    try {
1620      checkOpen();
1621      requestCount.increment();
1622      HRegion region = getRegion(request.getRegion());
1623      LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1624      boolean shouldFlush = true;
1625      if (request.hasIfOlderThanTs()) {
1626        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1627      }
1628      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1629      if (shouldFlush) {
1630        boolean writeFlushWalMarker =
1631          request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false;
1632        // Go behind the curtain so we can manage writing of the flush WAL marker
1633        HRegion.FlushResultImpl flushResult = null;
1634        if (request.hasFamily()) {
1635          List<byte[]> families = new ArrayList();
1636          families.add(request.getFamily().toByteArray());
1637          TableDescriptor tableDescriptor = region.getTableDescriptor();
1638          List<String> noSuchFamilies = families.stream()
1639            .filter(f -> !tableDescriptor.hasColumnFamily(f)).map(Bytes::toString).toList();
1640          if (!noSuchFamilies.isEmpty()) {
1641            throw new NoSuchColumnFamilyException("Column families " + noSuchFamilies
1642              + " don't exist in table " + tableDescriptor.getTableName().getNameAsString());
1643          }
1644          flushResult =
1645            region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1646        } else {
1647          flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
1648        }
1649        boolean compactionNeeded = flushResult.isCompactionNeeded();
1650        if (compactionNeeded) {
1651          server.getCompactSplitThread().requestSystemCompaction(region,
1652            "Compaction through user triggered flush");
1653        }
1654        builder.setFlushed(flushResult.isFlushSucceeded());
1655        builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1656      }
1657      builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1658      return builder.build();
1659    } catch (DroppedSnapshotException ex) {
1660      // Cache flush can fail in a few places. If it fails in a critical
1661      // section, we get a DroppedSnapshotException and a replay of wal
1662      // is required. Currently the only way to do this is a restart of
1663      // the server.
1664      server.abort("Replay of WAL required. Forcing server shutdown", ex);
1665      throw new ServiceException(ex);
1666    } catch (IOException ie) {
1667      throw new ServiceException(ie);
1668    }
1669  }
1670
1671  @Override
1672  @QosPriority(priority = HConstants.ADMIN_QOS)
1673  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1674    final GetOnlineRegionRequest request) throws ServiceException {
1675    try {
1676      checkOpen();
1677      requestCount.increment();
1678      Map<String, HRegion> onlineRegions = server.getOnlineRegions();
1679      List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
1680      for (HRegion region : onlineRegions.values()) {
1681        list.add(region.getRegionInfo());
1682      }
1683      list.sort(RegionInfo.COMPARATOR);
1684      return ResponseConverter.buildGetOnlineRegionResponse(list);
1685    } catch (IOException ie) {
1686      throw new ServiceException(ie);
1687    }
1688  }
1689
1690  // Master implementation of this Admin Service differs given it is not
1691  // able to supply detail only known to RegionServer. See note on
1692  // MasterRpcServers#getRegionInfo.
1693  @Override
1694  @QosPriority(priority = HConstants.ADMIN_QOS)
1695  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1696    final GetRegionInfoRequest request) throws ServiceException {
1697    try {
1698      checkOpen();
1699      requestCount.increment();
1700      HRegion region = getRegion(request.getRegion());
1701      RegionInfo info = region.getRegionInfo();
1702      byte[] bestSplitRow;
1703      if (request.hasBestSplitRow() && request.getBestSplitRow()) {
1704        bestSplitRow = region.checkSplit(true).orElse(null);
1705        // when all table data are in memstore, bestSplitRow = null
1706        // try to flush region first
1707        if (bestSplitRow == null) {
1708          region.flush(true);
1709          bestSplitRow = region.checkSplit(true).orElse(null);
1710        }
1711      } else {
1712        bestSplitRow = null;
1713      }
1714      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1715      builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
1716      if (request.hasCompactionState() && request.getCompactionState()) {
1717        builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState()));
1718      }
1719      builder.setSplittable(region.isSplittable());
1720      builder.setMergeable(region.isMergeable());
1721      if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) {
1722        builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow));
1723      }
1724      return builder.build();
1725    } catch (IOException ie) {
1726      throw new ServiceException(ie);
1727    }
1728  }
1729
1730  @Override
1731  @QosPriority(priority = HConstants.ADMIN_QOS)
1732  public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request)
1733    throws ServiceException {
1734
1735    List<HRegion> regions;
1736    if (request.hasTableName()) {
1737      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
1738      regions = server.getRegions(tableName);
1739    } else {
1740      regions = server.getRegions();
1741    }
1742    List<RegionLoad> rLoads = new ArrayList<>(regions.size());
1743    RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder();
1744    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1745
1746    try {
1747      for (HRegion region : regions) {
1748        rLoads.add(server.createRegionLoad(region, regionLoadBuilder, regionSpecifier));
1749      }
1750    } catch (IOException e) {
1751      throw new ServiceException(e);
1752    }
1753    GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder();
1754    builder.addAllRegionLoads(rLoads);
1755    return builder.build();
1756  }
1757
1758  @Override
1759  @QosPriority(priority = HConstants.ADMIN_QOS)
1760  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
1761    ClearCompactionQueuesRequest request) throws ServiceException {
1762    LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
1763      + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
1764    ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
1765    requestCount.increment();
1766    if (clearCompactionQueues.compareAndSet(false, true)) {
1767      final CompactSplit compactSplitThread = server.getCompactSplitThread();
1768      try {
1769        checkOpen();
1770        server.getRegionServerCoprocessorHost().preClearCompactionQueues();
1771        for (String queueName : request.getQueueNameList()) {
1772          LOG.debug("clear " + queueName + " compaction queue");
1773          switch (queueName) {
1774            case "long":
1775              compactSplitThread.clearLongCompactionsQueue();
1776              break;
1777            case "short":
1778              compactSplitThread.clearShortCompactionsQueue();
1779              break;
1780            default:
1781              LOG.warn("Unknown queue name " + queueName);
1782              throw new IOException("Unknown queue name " + queueName);
1783          }
1784        }
1785        server.getRegionServerCoprocessorHost().postClearCompactionQueues();
1786      } catch (IOException ie) {
1787        throw new ServiceException(ie);
1788      } finally {
1789        clearCompactionQueues.set(false);
1790      }
1791    } else {
1792      LOG.warn("Clear compactions queue is executing by other admin.");
1793    }
1794    return respBuilder.build();
1795  }
1796
1797  /**
1798   * Get some information of the region server.
1799   * @param controller the RPC controller
1800   * @param request    the request
1801   */
1802  @Override
1803  @QosPriority(priority = HConstants.ADMIN_QOS)
1804  public GetServerInfoResponse getServerInfo(final RpcController controller,
1805    final GetServerInfoRequest request) throws ServiceException {
1806    try {
1807      checkOpen();
1808    } catch (IOException ie) {
1809      throw new ServiceException(ie);
1810    }
1811    requestCount.increment();
1812    int infoPort = server.getInfoServer() != null ? server.getInfoServer().getPort() : -1;
1813    return ResponseConverter.buildGetServerInfoResponse(server.getServerName(), infoPort);
1814  }
1815
1816  @Override
1817  @QosPriority(priority = HConstants.ADMIN_QOS)
1818  public GetStoreFileResponse getStoreFile(final RpcController controller,
1819    final GetStoreFileRequest request) throws ServiceException {
1820    try {
1821      checkOpen();
1822      HRegion region = getRegion(request.getRegion());
1823      requestCount.increment();
1824      Set<byte[]> columnFamilies;
1825      if (request.getFamilyCount() == 0) {
1826        columnFamilies = region.getTableDescriptor().getColumnFamilyNames();
1827      } else {
1828        columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR);
1829        for (ByteString cf : request.getFamilyList()) {
1830          columnFamilies.add(cf.toByteArray());
1831        }
1832      }
1833      int nCF = columnFamilies.size();
1834      List<String> fileList = region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
1835      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1836      builder.addAllStoreFile(fileList);
1837      return builder.build();
1838    } catch (IOException ie) {
1839      throw new ServiceException(ie);
1840    }
1841  }
1842
1843  private void throwOnWrongStartCode(OpenRegionRequest request) throws ServiceException {
1844    if (!request.hasServerStartCode()) {
1845      LOG.warn("OpenRegionRequest for {} does not have a start code", request.getOpenInfoList());
1846      return;
1847    }
1848    throwOnWrongStartCode(request.getServerStartCode());
1849  }
1850
1851  private void throwOnWrongStartCode(CloseRegionRequest request) throws ServiceException {
1852    if (!request.hasServerStartCode()) {
1853      LOG.warn("CloseRegionRequest for {} does not have a start code", request.getRegion());
1854      return;
1855    }
1856    throwOnWrongStartCode(request.getServerStartCode());
1857  }
1858
1859  private void throwOnWrongStartCode(long serverStartCode) throws ServiceException {
1860    // check that we are the same server that this RPC is intended for.
1861    if (server.getServerName().getStartcode() != serverStartCode) {
1862      throw new ServiceException(new DoNotRetryIOException(
1863        "This RPC was intended for a " + "different server with startCode: " + serverStartCode
1864          + ", this server is: " + server.getServerName()));
1865    }
1866  }
1867
1868  private void throwOnWrongStartCode(ExecuteProceduresRequest req) throws ServiceException {
1869    if (req.getOpenRegionCount() > 0) {
1870      for (OpenRegionRequest openReq : req.getOpenRegionList()) {
1871        throwOnWrongStartCode(openReq);
1872      }
1873    }
1874    if (req.getCloseRegionCount() > 0) {
1875      for (CloseRegionRequest closeReq : req.getCloseRegionList()) {
1876        throwOnWrongStartCode(closeReq);
1877      }
1878    }
1879  }
1880
1881  /**
1882   * Open asynchronously a region or a set of regions on the region server. The opening is
1883   * coordinated by ZooKeeper, and this method requires the znode to be created before being called.
1884   * As a consequence, this method should be called only from the master.
1885   * <p>
1886   * Different manages states for the region are:
1887   * </p>
1888   * <ul>
1889   * <li>region not opened: the region opening will start asynchronously.</li>
1890   * <li>a close is already in progress: this is considered as an error.</li>
1891   * <li>an open is already in progress: this new open request will be ignored. This is important
1892   * because the Master can do multiple requests if it crashes.</li>
1893   * <li>the region is already opened: this new open request will be ignored.</li>
1894   * </ul>
1895   * <p>
1896   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1897   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1898   * errors are put in the response as FAILED_OPENING.
1899   * </p>
1900   * @param controller the RPC controller
1901   * @param request    the request
1902   */
1903  @Override
1904  @QosPriority(priority = HConstants.ADMIN_QOS)
1905  public OpenRegionResponse openRegion(final RpcController controller,
1906    final OpenRegionRequest request) throws ServiceException {
1907    requestCount.increment();
1908    throwOnWrongStartCode(request);
1909
1910    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1911    final int regionCount = request.getOpenInfoCount();
1912    final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount);
1913    final boolean isBulkAssign = regionCount > 1;
1914    try {
1915      checkOpen();
1916    } catch (IOException ie) {
1917      TableName tableName = null;
1918      if (regionCount == 1) {
1919        org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri =
1920          request.getOpenInfo(0).getRegion();
1921        if (ri != null) {
1922          tableName = ProtobufUtil.toTableName(ri.getTableName());
1923        }
1924      }
1925      if (!TableName.META_TABLE_NAME.equals(tableName)) {
1926        throw new ServiceException(ie);
1927      }
1928      // We are assigning meta, wait a little for regionserver to finish initialization.
1929      // Default to quarter of RPC timeout
1930      int timeout = server.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1931        HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1932      long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1933      synchronized (server.online) {
1934        try {
1935          while (
1936            EnvironmentEdgeManager.currentTime() <= endTime && !server.isStopped()
1937              && !server.isOnline()
1938          ) {
1939            server.online.wait(server.getMsgInterval());
1940          }
1941          checkOpen();
1942        } catch (InterruptedException t) {
1943          Thread.currentThread().interrupt();
1944          throw new ServiceException(t);
1945        } catch (IOException e) {
1946          throw new ServiceException(e);
1947        }
1948      }
1949    }
1950
1951    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1952
1953    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1954      final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
1955      TableDescriptor htd;
1956      try {
1957        String encodedName = region.getEncodedName();
1958        byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1959        final HRegion onlineRegion = server.getRegion(encodedName);
1960        if (onlineRegion != null) {
1961          // The region is already online. This should not happen any more.
1962          String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1963            + ", which is already online";
1964          LOG.warn(error);
1965          // server.abort(error);
1966          // throw new IOException(error);
1967          builder.addOpeningState(RegionOpeningState.OPENED);
1968          continue;
1969        }
1970        LOG.info("Open " + region.getRegionNameAsString());
1971
1972        final Boolean previous =
1973          server.getRegionsInTransitionInRS().putIfAbsent(encodedNameBytes, Boolean.TRUE);
1974
1975        if (Boolean.FALSE.equals(previous)) {
1976          if (server.getRegion(encodedName) != null) {
1977            // There is a close in progress. This should not happen any more.
1978            String error = "Received OPEN for the region:" + region.getRegionNameAsString()
1979              + ", which we are already trying to CLOSE";
1980            server.abort(error);
1981            throw new IOException(error);
1982          }
1983          server.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
1984        }
1985
1986        if (Boolean.TRUE.equals(previous)) {
1987          // An open is in progress. This is supported, but let's log this.
1988          LOG.info("Receiving OPEN for the region:" + region.getRegionNameAsString()
1989            + ", which we are already trying to OPEN"
1990            + " - ignoring this new request for this region.");
1991        }
1992
1993        // We are opening this region. If it moves back and forth for whatever reason, we don't
1994        // want to keep returning the stale moved record while we are opening/if we close again.
1995        server.removeFromMovedRegions(region.getEncodedName());
1996
1997        if (previous == null || !previous.booleanValue()) {
1998          htd = htds.get(region.getTable());
1999          if (htd == null) {
2000            htd = server.getTableDescriptors().get(region.getTable());
2001            htds.put(region.getTable(), htd);
2002          }
2003          if (htd == null) {
2004            throw new IOException("Missing table descriptor for " + region.getEncodedName());
2005          }
2006          // If there is no action in progress, we can submit a specific handler.
2007          // Need to pass the expected version in the constructor.
2008          if (server.getExecutorService() == null) {
2009            LOG.info("No executor executorService; skipping open request");
2010          } else {
2011            if (region.isMetaRegion()) {
2012              server.getExecutorService()
2013                .submit(new OpenMetaHandler(server, server, region, htd, masterSystemTime));
2014            } else {
2015              if (regionOpenInfo.getFavoredNodesCount() > 0) {
2016                server.updateRegionFavoredNodesMapping(region.getEncodedName(),
2017                  regionOpenInfo.getFavoredNodesList());
2018              }
2019              if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
2020                server.getExecutorService().submit(
2021                  new OpenPriorityRegionHandler(server, server, region, htd, masterSystemTime));
2022              } else {
2023                server.getExecutorService()
2024                  .submit(new OpenRegionHandler(server, server, region, htd, masterSystemTime));
2025              }
2026            }
2027          }
2028        }
2029
2030        builder.addOpeningState(RegionOpeningState.OPENED);
2031      } catch (IOException ie) {
2032        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
2033        if (isBulkAssign) {
2034          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
2035        } else {
2036          throw new ServiceException(ie);
2037        }
2038      }
2039    }
2040    return builder.build();
2041  }
2042
2043  /**
2044   * Warmup a region on this server. This method should only be called by Master. It synchronously
2045   * opens the region and closes the region bringing the most important pages in cache.
2046   */
2047  @Override
2048  public WarmupRegionResponse warmupRegion(final RpcController controller,
2049    final WarmupRegionRequest request) throws ServiceException {
2050    final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo());
2051    WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
2052    try {
2053      checkOpen();
2054      String encodedName = region.getEncodedName();
2055      byte[] encodedNameBytes = region.getEncodedNameAsBytes();
2056      final HRegion onlineRegion = server.getRegion(encodedName);
2057      if (onlineRegion != null) {
2058        LOG.info("{} is online; skipping warmup", region);
2059        return response;
2060      }
2061      TableDescriptor htd = server.getTableDescriptors().get(region.getTable());
2062      if (server.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
2063        LOG.info("{} is in transition; skipping warmup", region);
2064        return response;
2065      }
2066      LOG.info("Warmup {}", region.getRegionNameAsString());
2067      HRegion.warmupHRegion(region, htd, server.getWAL(region), server.getConfiguration(), server,
2068        null);
2069    } catch (IOException ie) {
2070      LOG.error("Failed warmup of {}", region.getRegionNameAsString(), ie);
2071      throw new ServiceException(ie);
2072    }
2073
2074    return response;
2075  }
2076
2077  private ExtendedCellScanner getAndReset(RpcController controller) {
2078    HBaseRpcController hrc = (HBaseRpcController) controller;
2079    ExtendedCellScanner cells = hrc.cellScanner();
2080    hrc.setCellScanner(null);
2081    return cells;
2082  }
2083
2084  /**
2085   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
2086   * that the given mutations will be durable on the receiving RS if this method returns without any
2087   * exception.
2088   * @param controller the RPC controller
2089   * @param request    the request
2090   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
2091   *             compatibility with old region replica implementation. Now we will use
2092   *             {@code replicateToReplica} method instead.
2093   */
2094  @Deprecated
2095  @Override
2096  @QosPriority(priority = HConstants.REPLAY_QOS)
2097  public ReplicateWALEntryResponse replay(final RpcController controller,
2098    final ReplicateWALEntryRequest request) throws ServiceException {
2099    long before = EnvironmentEdgeManager.currentTime();
2100    ExtendedCellScanner cells = getAndReset(controller);
2101    try {
2102      checkOpen();
2103      List<WALEntry> entries = request.getEntryList();
2104      if (entries == null || entries.isEmpty()) {
2105        // empty input
2106        return ReplicateWALEntryResponse.newBuilder().build();
2107      }
2108      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2109      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2110      RegionCoprocessorHost coprocessorHost =
2111        ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
2112          ? region.getCoprocessorHost()
2113          : null; // do not invoke coprocessors if this is a secondary region replica
2114
2115      // Skip adding the edits to WAL if this is a secondary region replica
2116      boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
2117      Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
2118
2119      for (WALEntry entry : entries) {
2120        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2121          throw new NotServingRegionException("Replay request contains entries from multiple "
2122            + "regions. First region:" + regionName.toStringUtf8() + " , other region:"
2123            + entry.getKey().getEncodedRegionName());
2124        }
2125        if (server.nonceManager != null && isPrimary) {
2126          long nonceGroup =
2127            entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2128          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2129          server.nonceManager.reportOperationFromWal(nonceGroup, nonce,
2130            entry.getKey().getWriteTime());
2131        }
2132        Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
2133        List<MutationReplay> edits =
2134          WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2135        if (edits != null && !edits.isEmpty()) {
2136          // HBASE-17924
2137          // sort to improve lock efficiency
2138          Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation));
2139          long replaySeqId = (entry.getKey().hasOrigSequenceNumber())
2140            ? entry.getKey().getOrigSequenceNumber()
2141            : entry.getKey().getLogSequenceNumber();
2142          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
2143          // check if it's a partial success
2144          for (int i = 0; result != null && i < result.length; i++) {
2145            if (result[i] != OperationStatus.SUCCESS) {
2146              throw new IOException(result[i].getExceptionMsg());
2147            }
2148          }
2149        }
2150      }
2151
2152      // sync wal at the end because ASYNC_WAL is used above
2153      WAL wal = region.getWAL();
2154      if (wal != null) {
2155        wal.sync();
2156      }
2157      return ReplicateWALEntryResponse.newBuilder().build();
2158    } catch (IOException ie) {
2159      throw new ServiceException(ie);
2160    } finally {
2161      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2162      if (metricsRegionServer != null) {
2163        metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
2164      }
2165    }
2166  }
2167
2168  /**
2169   * Replay the given changes on a secondary replica
2170   */
2171  @Override
2172  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
2173    ReplicateWALEntryRequest request) throws ServiceException {
2174    CellScanner cells = getAndReset(controller);
2175    try {
2176      checkOpen();
2177      List<WALEntry> entries = request.getEntryList();
2178      if (entries == null || entries.isEmpty()) {
2179        // empty input
2180        return ReplicateWALEntryResponse.newBuilder().build();
2181      }
2182      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
2183      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
2184      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2185        throw new DoNotRetryIOException(
2186          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
2187      }
2188      for (WALEntry entry : entries) {
2189        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
2190          throw new NotServingRegionException(
2191            "ReplicateToReplica request contains entries from multiple " + "regions. First region:"
2192              + regionName.toStringUtf8() + " , other region:"
2193              + entry.getKey().getEncodedRegionName());
2194        }
2195        region.replayWALEntry(entry, cells);
2196      }
2197      return ReplicateWALEntryResponse.newBuilder().build();
2198    } catch (IOException ie) {
2199      throw new ServiceException(ie);
2200    }
2201  }
2202
2203  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
2204    ReplicationSourceService replicationSource = server.getReplicationSourceService();
2205    if (replicationSource == null || entries.isEmpty()) {
2206      return;
2207    }
2208    // We can ensure that all entries are for one peer, so only need to check one entry's
2209    // table name. if the table hit sync replication at peer side and the peer cluster
2210    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
2211    // those entries according to the design doc.
2212    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
2213    if (
2214      replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
2215        RejectReplicationRequestStateChecker.get())
2216    ) {
2217      throw new DoNotRetryIOException(
2218        "Reject to apply to sink cluster because sync replication state of sink cluster "
2219          + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
2220    }
2221  }
2222
2223  /**
2224   * Replicate WAL entries on the region server.
2225   * @param controller the RPC controller
2226   * @param request    the request
2227   */
2228  @Override
2229  @QosPriority(priority = HConstants.REPLICATION_QOS)
2230  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
2231    final ReplicateWALEntryRequest request) throws ServiceException {
2232    try {
2233      checkOpen();
2234      if (server.getReplicationSinkService() != null) {
2235        requestCount.increment();
2236        List<WALEntry> entries = request.getEntryList();
2237        checkShouldRejectReplicationRequest(entries);
2238        ExtendedCellScanner cellScanner = getAndReset(controller);
2239        server.getRegionServerCoprocessorHost().preReplicateLogEntries();
2240        server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
2241          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
2242          request.getSourceHFileArchiveDirPath());
2243        server.getRegionServerCoprocessorHost().postReplicateLogEntries();
2244        return ReplicateWALEntryResponse.newBuilder().build();
2245      } else {
2246        throw new ServiceException("Replication services are not initialized yet");
2247      }
2248    } catch (IOException ie) {
2249      throw new ServiceException(ie);
2250    }
2251  }
2252
2253  /**
2254   * Roll the WAL writer of the region server.
2255   * @param controller the RPC controller
2256   * @param request    the request
2257   */
2258  @Override
2259  @QosPriority(priority = HConstants.ADMIN_QOS)
2260  public RollWALWriterResponse rollWALWriter(final RpcController controller,
2261    final RollWALWriterRequest request) throws ServiceException {
2262    try {
2263      checkOpen();
2264      requestCount.increment();
2265      server.getRegionServerCoprocessorHost().preRollWALWriterRequest();
2266      server.getWalRoller().requestRollAll();
2267      server.getRegionServerCoprocessorHost().postRollWALWriterRequest();
2268      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
2269      return builder.build();
2270    } catch (IOException ie) {
2271      throw new ServiceException(ie);
2272    }
2273  }
2274
2275  /**
2276   * Stop the region server.
2277   * @param controller the RPC controller
2278   * @param request    the request
2279   */
2280  @Override
2281  @QosPriority(priority = HConstants.ADMIN_QOS)
2282  public StopServerResponse stopServer(final RpcController controller,
2283    final StopServerRequest request) throws ServiceException {
2284    rpcPreCheck("stopServer");
2285    requestCount.increment();
2286    String reason = request.getReason();
2287    server.stop(reason);
2288    return StopServerResponse.newBuilder().build();
2289  }
2290
2291  @Override
2292  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
2293    UpdateFavoredNodesRequest request) throws ServiceException {
2294    rpcPreCheck("updateFavoredNodes");
2295    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
2296    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
2297    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
2298      RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion());
2299      if (regionUpdateInfo.getFavoredNodesCount() > 0) {
2300        server.updateRegionFavoredNodesMapping(hri.getEncodedName(),
2301          regionUpdateInfo.getFavoredNodesList());
2302      }
2303    }
2304    respBuilder.setResponse(openInfoList.size());
2305    return respBuilder.build();
2306  }
2307
2308  /**
2309   * Atomically bulk load several HFiles into an open region
2310   * @return true if successful, false is failed but recoverably (no action)
2311   * @throws ServiceException if failed unrecoverably
2312   */
2313  @Override
2314  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
2315    final BulkLoadHFileRequest request) throws ServiceException {
2316    long start = EnvironmentEdgeManager.currentTime();
2317    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2318    if (clusterIds.contains(this.server.getClusterId())) {
2319      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2320    } else {
2321      clusterIds.add(this.server.getClusterId());
2322    }
2323    try {
2324      checkOpen();
2325      requestCount.increment();
2326      HRegion region = getRegion(request.getRegion());
2327      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
2328      long sizeToBeLoaded = -1;
2329
2330      // Check to see if this bulk load would exceed the space quota for this table
2331      if (spaceQuotaEnabled) {
2332        ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
2333        SpaceViolationPolicyEnforcement enforcement =
2334          activeSpaceQuotas.getPolicyEnforcement(region);
2335        if (enforcement != null) {
2336          // Bulk loads must still be atomic. We must enact all or none.
2337          List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
2338          for (FamilyPath familyPath : request.getFamilyPathList()) {
2339            filePaths.add(familyPath.getPath());
2340          }
2341          // Check if the batch of files exceeds the current quota
2342          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
2343        }
2344      }
2345      // secure bulk load
2346      Map<byte[], List<Path>> map =
2347        server.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
2348      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
2349      builder.setLoaded(map != null);
2350      if (map != null) {
2351        // Treat any negative size as a flag to "ignore" updating the region size as that is
2352        // not possible to occur in real life (cannot bulk load a file with negative size)
2353        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
2354          if (LOG.isTraceEnabled()) {
2355            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
2356              + sizeToBeLoaded + " bytes");
2357          }
2358          // Inform space quotas of the new files for this region
2359          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(region.getRegionInfo(),
2360            sizeToBeLoaded);
2361        }
2362      }
2363      return builder.build();
2364    } catch (IOException ie) {
2365      throw new ServiceException(ie);
2366    } finally {
2367      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2368      if (metricsRegionServer != null) {
2369        metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
2370      }
2371    }
2372  }
2373
2374  @Override
2375  public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
2376    PrepareBulkLoadRequest request) throws ServiceException {
2377    try {
2378      checkOpen();
2379      requestCount.increment();
2380
2381      HRegion region = getRegion(request.getRegion());
2382
2383      String bulkToken = server.getSecureBulkLoadManager().prepareBulkLoad(region, request);
2384      PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
2385      builder.setBulkToken(bulkToken);
2386      return builder.build();
2387    } catch (IOException ie) {
2388      throw new ServiceException(ie);
2389    }
2390  }
2391
2392  @Override
2393  public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
2394    CleanupBulkLoadRequest request) throws ServiceException {
2395    try {
2396      checkOpen();
2397      requestCount.increment();
2398
2399      HRegion region = getRegion(request.getRegion());
2400
2401      server.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
2402      return CleanupBulkLoadResponse.newBuilder().build();
2403    } catch (IOException ie) {
2404      throw new ServiceException(ie);
2405    }
2406  }
2407
2408  @Override
2409  public CoprocessorServiceResponse execService(final RpcController controller,
2410    final CoprocessorServiceRequest request) throws ServiceException {
2411    try {
2412      checkOpen();
2413      requestCount.increment();
2414      HRegion region = getRegion(request.getRegion());
2415      Message result = execServiceOnRegion(region, request.getCall());
2416      CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder();
2417      builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
2418        region.getRegionInfo().getRegionName()));
2419      // TODO: COPIES!!!!!!
2420      builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
2421        org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
2422      return builder.build();
2423    } catch (IOException ie) {
2424      throw new ServiceException(ie);
2425    }
2426  }
2427
2428  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
2429    if (filePaths.isEmpty()) {
2430      // local hdfs
2431      return server.getFileSystem();
2432    }
2433    // source hdfs
2434    return new Path(filePaths.get(0)).getFileSystem(server.getConfiguration());
2435  }
2436
2437  private Message execServiceOnRegion(HRegion region,
2438    final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
2439    // ignore the passed in controller (from the serialized call)
2440    ServerRpcController execController = new ServerRpcController();
2441    return region.execService(execController, serviceCall);
2442  }
2443
2444  private boolean shouldRejectRequestsFromClient(HRegion region) {
2445    TableName table = region.getRegionInfo().getTable();
2446    ReplicationSourceService service = server.getReplicationSourceService();
2447    return service != null && service.getSyncReplicationPeerInfoProvider().checkState(table,
2448      RejectRequestsFromClientStateChecker.get());
2449  }
2450
2451  private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
2452    if (shouldRejectRequestsFromClient(region)) {
2453      throw new DoNotRetryIOException(
2454        region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
2455    }
2456  }
2457
2458  /**
2459   * Get data from a table.
2460   * @param controller the RPC controller
2461   * @param request    the get request
2462   */
2463  @Override
2464  public GetResponse get(final RpcController controller, final GetRequest request)
2465    throws ServiceException {
2466    long before = EnvironmentEdgeManager.currentTime();
2467    OperationQuota quota = null;
2468    HRegion region = null;
2469    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2470    try {
2471      checkOpen();
2472      requestCount.increment();
2473      rpcGetRequestCount.increment();
2474      region = getRegion(request.getRegion());
2475      rejectIfInStandByState(region);
2476
2477      GetResponse.Builder builder = GetResponse.newBuilder();
2478      ClientProtos.Get get = request.getGet();
2479      // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
2480      // a get closest before. Throwing the UnknownProtocolException signals it that it needs
2481      // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
2482      // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
2483      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2484        throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
2485          + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
2486          + "reverse Scan.");
2487      }
2488      Boolean existence = null;
2489      Result r = null;
2490      quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET);
2491
2492      Get clientGet = ProtobufUtil.toGet(get);
2493      if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2494        existence = region.getCoprocessorHost().preExists(clientGet);
2495      }
2496      if (existence == null) {
2497        if (context != null) {
2498          r = get(clientGet, (region), null, context);
2499        } else {
2500          // for test purpose
2501          r = region.get(clientGet);
2502        }
2503        if (get.getExistenceOnly()) {
2504          boolean exists = r.getExists();
2505          if (region.getCoprocessorHost() != null) {
2506            exists = region.getCoprocessorHost().postExists(clientGet, exists);
2507          }
2508          existence = exists;
2509        }
2510      }
2511      if (existence != null) {
2512        ClientProtos.Result pbr =
2513          ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2514        builder.setResult(pbr);
2515      } else if (r != null) {
2516        ClientProtos.Result pbr;
2517        if (
2518          isClientCellBlockSupport(context) && controller instanceof HBaseRpcController
2519            && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)
2520        ) {
2521          pbr = ProtobufUtil.toResultNoData(r);
2522          ((HBaseRpcController) controller).setCellScanner(
2523            PrivateCellUtil.createExtendedCellScanner(ClientInternalHelper.getExtendedRawCells(r)));
2524          addSize(context, r);
2525        } else {
2526          pbr = ProtobufUtil.toResult(r);
2527        }
2528        builder.setResult(pbr);
2529      }
2530      // r.cells is null when an table.exists(get) call
2531      if (r != null && r.rawCells() != null) {
2532        quota.addGetResult(r);
2533      }
2534      return builder.build();
2535    } catch (IOException ie) {
2536      throw new ServiceException(ie);
2537    } finally {
2538      final MetricsRegionServer metricsRegionServer = server.getMetrics();
2539      if (metricsRegionServer != null && region != null) {
2540        long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0;
2541        metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
2542          blockBytesScanned);
2543      }
2544      if (quota != null) {
2545        quota.close();
2546      }
2547    }
2548  }
2549
2550  private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
2551    RpcCallContext context) throws IOException {
2552    region.prepareGet(get);
2553    boolean stale = region.getRegionInfo().getReplicaId() != 0;
2554
2555    // This method is almost the same as HRegion#get.
2556    List<Cell> results = new ArrayList<>();
2557    // pre-get CP hook
2558    if (region.getCoprocessorHost() != null) {
2559      if (region.getCoprocessorHost().preGet(get, results)) {
2560        region.metricsUpdateForGet();
2561        return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null,
2562          stale);
2563      }
2564    }
2565    Scan scan = new Scan(get);
2566    if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
2567      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2568    }
2569    RegionScannerImpl scanner = null;
2570    try {
2571      scanner = region.getScanner(scan);
2572      scanner.next(results);
2573    } finally {
2574      if (scanner != null) {
2575        if (closeCallBack == null) {
2576          // If there is a context then the scanner can be added to the current
2577          // RpcCallContext. The rpc callback will take care of closing the
2578          // scanner, for eg in case
2579          // of get()
2580          context.setCallBack(scanner);
2581        } else {
2582          // The call is from multi() where the results from the get() are
2583          // aggregated and then send out to the
2584          // rpc. The rpccall back will close all such scanners created as part
2585          // of multi().
2586          closeCallBack.addScanner(scanner);
2587        }
2588      }
2589    }
2590
2591    // post-get CP hook
2592    if (region.getCoprocessorHost() != null) {
2593      region.getCoprocessorHost().postGet(get, results);
2594    }
2595    region.metricsUpdateForGet();
2596
2597    return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
2598  }
2599
2600  private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
2601    int sum = 0;
2602    String firstRegionName = null;
2603    for (RegionAction regionAction : request.getRegionActionList()) {
2604      if (sum == 0) {
2605        firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray());
2606      }
2607      sum += regionAction.getActionCount();
2608    }
2609    if (sum > rowSizeWarnThreshold) {
2610      LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
2611        + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
2612        + RpcServer.getRequestUserName().orElse(null) + "/"
2613        + RpcServer.getRemoteAddress().orElse(null) + " first region in multi=" + firstRegionName);
2614      if (rejectRowsWithSizeOverThreshold) {
2615        throw new ServiceException(
2616          "Rejecting large batch operation for current batch with firstRegionName: "
2617            + firstRegionName + " , Requested Number of Rows: " + sum + " , Size Threshold: "
2618            + rowSizeWarnThreshold);
2619      }
2620    }
2621  }
2622
2623  private void failRegionAction(MultiResponse.Builder responseBuilder,
2624    RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
2625    CellScanner cellScanner, Throwable error) {
2626    rpcServer.getMetrics().exception(error);
2627    regionActionResultBuilder.setException(ResponseConverter.buildException(error));
2628    responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2629    // All Mutations in this RegionAction not executed as we can not see the Region online here
2630    // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2631    // corresponding to these Mutations.
2632    if (cellScanner != null) {
2633      skipCellsForMutations(regionAction.getActionList(), cellScanner);
2634    }
2635  }
2636
2637  private boolean isReplicationRequest(Action action) {
2638    // replication request can only be put or delete.
2639    if (!action.hasMutation()) {
2640      return false;
2641    }
2642    MutationProto mutation = action.getMutation();
2643    MutationType type = mutation.getMutateType();
2644    if (type != MutationType.PUT && type != MutationType.DELETE) {
2645      return false;
2646    }
2647    // replication will set a special attribute so we can make use of it to decide whether a request
2648    // is for replication.
2649    return mutation.getAttributeList().stream().map(p -> p.getName())
2650      .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
2651  }
2652
2653  /**
2654   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2655   * @param rpcc    the RPC controller
2656   * @param request the multi request
2657   */
2658  @Override
2659  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2660    throws ServiceException {
2661    try {
2662      checkOpen();
2663    } catch (IOException ie) {
2664      throw new ServiceException(ie);
2665    }
2666
2667    checkBatchSizeAndLogLargeSize(request);
2668
2669    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2670    // It is also the conduit via which we pass back data.
2671    HBaseRpcController controller = (HBaseRpcController) rpcc;
2672    CellScanner cellScanner = controller != null ? getAndReset(controller) : null;
2673
2674    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2675
2676    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2677    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2678    this.rpcMultiRequestCount.increment();
2679    this.requestCount.increment();
2680    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2681
2682    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
2683    // following logic is for backward compatibility as old clients still use
2684    // MultiRequest#condition in case of checkAndMutate with RowMutations.
2685    if (request.hasCondition()) {
2686      if (request.getRegionActionList().isEmpty()) {
2687        // If the region action list is empty, do nothing.
2688        responseBuilder.setProcessed(true);
2689        return responseBuilder.build();
2690      }
2691
2692      RegionAction regionAction = request.getRegionAction(0);
2693
2694      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
2695      // we can assume regionAction.getAtomic() is true here.
2696      assert regionAction.getAtomic();
2697
2698      OperationQuota quota;
2699      HRegion region;
2700      RegionSpecifier regionSpecifier = regionAction.getRegion();
2701
2702      try {
2703        region = getRegion(regionSpecifier);
2704        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2705          regionAction.hasCondition());
2706      } catch (IOException e) {
2707        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2708        return responseBuilder.build();
2709      }
2710
2711      try {
2712        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2713        // We only allow replication in standby state and it will not set the atomic flag.
2714        if (rejectIfFromClient) {
2715          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2716            new DoNotRetryIOException(
2717              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2718          return responseBuilder.build();
2719        }
2720
2721        try {
2722          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2723            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
2724          responseBuilder.setProcessed(result.isSuccess());
2725          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2726            ClientProtos.ResultOrException.newBuilder();
2727          for (int i = 0; i < regionAction.getActionCount(); i++) {
2728            // To unify the response format with doNonAtomicRegionMutation and read through
2729            // client's AsyncProcess we have to add an empty result instance per operation
2730            resultOrExceptionOrBuilder.clear();
2731            resultOrExceptionOrBuilder.setIndex(i);
2732            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2733          }
2734        } catch (IOException e) {
2735          rpcServer.getMetrics().exception(e);
2736          // As it's an atomic operation with a condition, we may expect it's a global failure.
2737          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2738        }
2739      } finally {
2740        quota.close();
2741      }
2742
2743      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2744      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2745      if (regionLoadStats != null) {
2746        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
2747          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
2748      }
2749      return responseBuilder.build();
2750    }
2751
2752    // this will contain all the cells that we need to return. It's created later, if needed.
2753    List<ExtendedCellScannable> cellsToReturn = null;
2754    RegionScannersCloseCallBack closeCallBack = null;
2755    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2756    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
2757      new HashMap<>(request.getRegionActionCount());
2758
2759    for (RegionAction regionAction : request.getRegionActionList()) {
2760      OperationQuota quota;
2761      HRegion region;
2762      RegionSpecifier regionSpecifier = regionAction.getRegion();
2763      regionActionResultBuilder.clear();
2764
2765      try {
2766        region = getRegion(regionSpecifier);
2767        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
2768          regionAction.hasCondition());
2769      } catch (IOException e) {
2770        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
2771        continue; // For this region it's a failure.
2772      }
2773
2774      try {
2775        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
2776
2777        if (regionAction.hasCondition()) {
2778          // We only allow replication in standby state and it will not set the atomic flag.
2779          if (rejectIfFromClient) {
2780            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2781              new DoNotRetryIOException(
2782                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2783            continue;
2784          }
2785
2786          try {
2787            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
2788              ClientProtos.ResultOrException.newBuilder();
2789            if (regionAction.getActionCount() == 1) {
2790              CheckAndMutateResult result =
2791                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
2792                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2793              regionActionResultBuilder.setProcessed(result.isSuccess());
2794              resultOrExceptionOrBuilder.setIndex(0);
2795              if (result.getResult() != null) {
2796                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
2797              }
2798              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2799            } else {
2800              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
2801                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
2802              regionActionResultBuilder.setProcessed(result.isSuccess());
2803              for (int i = 0; i < regionAction.getActionCount(); i++) {
2804                if (i == 0 && result.getResult() != null) {
2805                  // Set the result of the Increment/Append operations to the first element of the
2806                  // ResultOrException list
2807                  resultOrExceptionOrBuilder.setIndex(i);
2808                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
2809                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
2810                  continue;
2811                }
2812                // To unify the response format with doNonAtomicRegionMutation and read through
2813                // client's AsyncProcess we have to add an empty result instance per operation
2814                resultOrExceptionOrBuilder.clear();
2815                resultOrExceptionOrBuilder.setIndex(i);
2816                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
2817              }
2818            }
2819          } catch (IOException e) {
2820            rpcServer.getMetrics().exception(e);
2821            // As it's an atomic operation with a condition, we may expect it's a global failure.
2822            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2823          }
2824        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2825          // We only allow replication in standby state and it will not set the atomic flag.
2826          if (rejectIfFromClient) {
2827            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2828              new DoNotRetryIOException(
2829                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2830            continue;
2831          }
2832          try {
2833            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
2834              cellScanner, nonceGroup, spaceQuotaEnforcement);
2835            regionActionResultBuilder.setProcessed(true);
2836            // We no longer use MultiResponse#processed. Instead, we use
2837            // RegionActionResult#processed. This is for backward compatibility for old clients.
2838            responseBuilder.setProcessed(true);
2839          } catch (IOException e) {
2840            rpcServer.getMetrics().exception(e);
2841            // As it's atomic, we may expect it's a global failure.
2842            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2843          }
2844        } else {
2845          if (
2846            rejectIfFromClient && regionAction.getActionCount() > 0
2847              && !isReplicationRequest(regionAction.getAction(0))
2848          ) {
2849            // fail if it is not a replication request
2850            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
2851              new DoNotRetryIOException(
2852                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
2853            continue;
2854          }
2855          // doNonAtomicRegionMutation manages the exception internally
2856          if (context != null && closeCallBack == null) {
2857            // An RpcCallBack that creates a list of scanners that needs to perform callBack
2858            // operation on completion of multiGets.
2859            // Set this only once
2860            closeCallBack = new RegionScannersCloseCallBack();
2861            context.setCallBack(closeCallBack);
2862          }
2863          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2864            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
2865            spaceQuotaEnforcement);
2866        }
2867      } finally {
2868        quota.close();
2869      }
2870
2871      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2872      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
2873      if (regionLoadStats != null) {
2874        regionStats.put(regionSpecifier, regionLoadStats);
2875      }
2876    }
2877    // Load the controller with the Cells to return.
2878    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2879      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn));
2880    }
2881
2882    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
2883    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
2884      builder.addRegion(stat.getKey());
2885      builder.addStat(stat.getValue());
2886    }
2887    responseBuilder.setRegionStatistics(builder);
2888    return responseBuilder.build();
2889  }
2890
2891  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2892    if (cellScanner == null) {
2893      return;
2894    }
2895    for (Action action : actions) {
2896      skipCellsForMutation(action, cellScanner);
2897    }
2898  }
2899
2900  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2901    if (cellScanner == null) {
2902      return;
2903    }
2904    try {
2905      if (action.hasMutation()) {
2906        MutationProto m = action.getMutation();
2907        if (m.hasAssociatedCellCount()) {
2908          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2909            cellScanner.advance();
2910          }
2911        }
2912      }
2913    } catch (IOException e) {
2914      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2915      // marked as failed as we could not see the Region here. At client side the top level
2916      // RegionAction exception will be considered first.
2917      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2918    }
2919  }
2920
2921  /**
2922   * Mutate data in a table.
2923   * @param rpcc    the RPC controller
2924   * @param request the mutate request
2925   */
2926  @Override
2927  public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
2928    throws ServiceException {
2929    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
2930    // It is also the conduit via which we pass back data.
2931    HBaseRpcController controller = (HBaseRpcController) rpcc;
2932    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2933    OperationQuota quota = null;
2934    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
2935    // Clear scanner so we are not holding on to reference across call.
2936    if (controller != null) {
2937      controller.setCellScanner(null);
2938    }
2939    try {
2940      checkOpen();
2941      requestCount.increment();
2942      rpcMutateRequestCount.increment();
2943      HRegion region = getRegion(request.getRegion());
2944      rejectIfInStandByState(region);
2945      MutateResponse.Builder builder = MutateResponse.newBuilder();
2946      MutationProto mutation = request.getMutation();
2947      if (!region.getRegionInfo().isMetaRegion()) {
2948        server.getMemStoreFlusher().reclaimMemStoreMemory();
2949      }
2950      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2951      OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
2952      quota = getRpcQuotaManager().checkBatchQuota(region, operationType);
2953      ActivePolicyEnforcement spaceQuotaEnforcement =
2954        getSpaceQuotaManager().getActiveEnforcements();
2955
2956      if (request.hasCondition()) {
2957        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
2958          request.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
2959        builder.setProcessed(result.isSuccess());
2960        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2961        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
2962        if (clientCellBlockSupported) {
2963          addSize(context, result.getResult());
2964        }
2965      } else {
2966        Result r = null;
2967        Boolean processed = null;
2968        MutationType type = mutation.getMutateType();
2969        switch (type) {
2970          case APPEND:
2971            // TODO: this doesn't actually check anything.
2972            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2973              context);
2974            break;
2975          case INCREMENT:
2976            // TODO: this doesn't actually check anything.
2977            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
2978              context);
2979            break;
2980          case PUT:
2981            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2982            processed = Boolean.TRUE;
2983            break;
2984          case DELETE:
2985            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
2986            processed = Boolean.TRUE;
2987            break;
2988          default:
2989            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
2990        }
2991        if (processed != null) {
2992          builder.setProcessed(processed);
2993        }
2994        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
2995        addResult(builder, r, controller, clientCellBlockSupported);
2996        if (clientCellBlockSupported) {
2997          addSize(context, r);
2998        }
2999      }
3000      return builder.build();
3001    } catch (IOException ie) {
3002      server.checkFileSystem();
3003      throw new ServiceException(ie);
3004    } finally {
3005      if (quota != null) {
3006        quota.close();
3007      }
3008    }
3009  }
3010
3011  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
3012    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3013    long before = EnvironmentEdgeManager.currentTime();
3014    Put put = ProtobufUtil.toPut(mutation, cellScanner);
3015    checkCellSizeLimit(region, put);
3016    spaceQuota.getPolicyEnforcement(region).check(put);
3017    quota.addMutation(put);
3018    region.put(put);
3019
3020    MetricsRegionServer metricsRegionServer = server.getMetrics();
3021    if (metricsRegionServer != null) {
3022      long after = EnvironmentEdgeManager.currentTime();
3023      metricsRegionServer.updatePut(region, after - before);
3024    }
3025  }
3026
3027  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
3028    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
3029    long before = EnvironmentEdgeManager.currentTime();
3030    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
3031    checkCellSizeLimit(region, delete);
3032    spaceQuota.getPolicyEnforcement(region).check(delete);
3033    quota.addMutation(delete);
3034    region.delete(delete);
3035
3036    MetricsRegionServer metricsRegionServer = server.getMetrics();
3037    if (metricsRegionServer != null) {
3038      long after = EnvironmentEdgeManager.currentTime();
3039      metricsRegionServer.updateDelete(region, after - before);
3040    }
3041  }
3042
3043  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
3044    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
3045    ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException {
3046    long before = EnvironmentEdgeManager.currentTime();
3047    long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
3048    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner);
3049    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
3050    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
3051    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
3052    quota.addMutation((Mutation) checkAndMutate.getAction());
3053
3054    CheckAndMutateResult result = null;
3055    if (region.getCoprocessorHost() != null) {
3056      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
3057    }
3058    if (result == null) {
3059      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3060      if (region.getCoprocessorHost() != null) {
3061        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
3062      }
3063    }
3064    MetricsRegionServer metricsRegionServer = server.getMetrics();
3065    if (metricsRegionServer != null) {
3066      long after = EnvironmentEdgeManager.currentTime();
3067      long blockBytesScanned =
3068        context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
3069      metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned);
3070
3071      MutationType type = mutation.getMutateType();
3072      switch (type) {
3073        case PUT:
3074          metricsRegionServer.updateCheckAndPut(region, after - before);
3075          break;
3076        case DELETE:
3077          metricsRegionServer.updateCheckAndDelete(region, after - before);
3078          break;
3079        default:
3080          break;
3081      }
3082    }
3083    return result;
3084  }
3085
3086  // This is used to keep compatible with the old client implementation. Consider remove it if we
3087  // decide to drop the support of the client that still sends close request to a region scanner
3088  // which has already been exhausted.
3089  @Deprecated
3090  private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
3091
3092    private static final long serialVersionUID = -4305297078988180130L;
3093
3094    @Override
3095    public synchronized Throwable fillInStackTrace() {
3096      return this;
3097    }
3098  };
3099
3100  private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
3101    String scannerName = toScannerName(request.getScannerId());
3102    RegionScannerHolder rsh = this.scanners.get(scannerName);
3103    if (rsh == null) {
3104      // just ignore the next or close request if scanner does not exists.
3105      Long lastCallSeq = closedScanners.getIfPresent(scannerName);
3106      if (lastCallSeq != null) {
3107        // Check the sequence number to catch if the last call was incorrectly retried.
3108        // The only allowed scenario is when the scanner is exhausted and one more scan
3109        // request arrives - in this case returning 0 rows is correct.
3110        if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) {
3111          throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: "
3112            + (lastCallSeq + 1) + " But the nextCallSeq got from client: "
3113            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3114        } else {
3115          throw SCANNER_ALREADY_CLOSED;
3116        }
3117      } else {
3118        LOG.warn("Client tried to access missing scanner " + scannerName);
3119        throw new UnknownScannerException(
3120          "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
3121            + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
3122            + "long wait between consecutive client checkins, c) Server may be closing down, "
3123            + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
3124            + "possible fix would be increasing the value of"
3125            + "'hbase.client.scanner.timeout.period' configuration.");
3126      }
3127    }
3128    rejectIfInStandByState(rsh.r);
3129    RegionInfo hri = rsh.s.getRegionInfo();
3130    // Yes, should be the same instance
3131    if (server.getOnlineRegion(hri.getRegionName()) != rsh.r) {
3132      String msg = "Region has changed on the scanner " + scannerName + ": regionName="
3133        + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r;
3134      LOG.warn(msg + ", closing...");
3135      scanners.remove(scannerName);
3136      try {
3137        rsh.s.close();
3138      } catch (IOException e) {
3139        LOG.warn("Getting exception closing " + scannerName, e);
3140      } finally {
3141        try {
3142          server.getLeaseManager().cancelLease(scannerName);
3143        } catch (LeaseException e) {
3144          LOG.warn("Getting exception closing " + scannerName, e);
3145        }
3146      }
3147      throw new NotServingRegionException(msg);
3148    }
3149    return rsh;
3150  }
3151
3152  /**
3153   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3154   *         value.
3155   */
3156  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region,
3157    ScanResponse.Builder builder) throws IOException {
3158    rejectIfInStandByState(region);
3159    ClientProtos.Scan protoScan = request.getScan();
3160    boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3161    Scan scan = ProtobufUtil.toScan(protoScan);
3162    // if the request doesn't set this, get the default region setting.
3163    if (!isLoadingCfsOnDemandSet) {
3164      scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3165    }
3166
3167    if (!scan.hasFamilies()) {
3168      // Adding all families to scanner
3169      for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) {
3170        scan.addFamily(family);
3171      }
3172    }
3173    if (region.getCoprocessorHost() != null) {
3174      // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a
3175      // wrapper for the core created RegionScanner
3176      region.getCoprocessorHost().preScannerOpen(scan);
3177    }
3178    RegionScannerImpl coreScanner = region.getScanner(scan);
3179    Shipper shipper = coreScanner;
3180    RegionScanner scanner = coreScanner;
3181    try {
3182      if (region.getCoprocessorHost() != null) {
3183        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3184      }
3185    } catch (Exception e) {
3186      // Although region coprocessor is for advanced users and they should take care of the
3187      // implementation to not damage the HBase system, closing the scanner on exception here does
3188      // not have any bad side effect, so let's do it
3189      scanner.close();
3190      throw e;
3191    }
3192    long scannerId = scannerIdGenerator.generateNewScannerId();
3193    builder.setScannerId(scannerId);
3194    builder.setMvccReadPoint(scanner.getMvccReadPoint());
3195    builder.setTtl(scannerLeaseTimeoutPeriod);
3196    String scannerName = toScannerName(scannerId);
3197
3198    boolean fullRegionScan =
3199      !region.getRegionInfo().getTable().isSystemTable() && isFullRegionScan(scan, region);
3200
3201    return new Pair<String, RegionScannerHolder>(scannerName,
3202      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(), fullRegionScan));
3203  }
3204
3205  /**
3206   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3207   * this.scanners, the Map of outstanding scanners and their current state.
3208   * @param scannerId A scanner long id.
3209   * @return The long id as a String.
3210   */
3211  private static String toScannerName(long scannerId) {
3212    return Long.toString(scannerId);
3213  }
3214
3215  private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
3216    throws OutOfOrderScannerNextException {
3217    // if nextCallSeq does not match throw Exception straight away. This needs to be
3218    // performed even before checking of Lease.
3219    // See HBASE-5974
3220    if (request.hasNextCallSeq()) {
3221      long callSeq = request.getNextCallSeq();
3222      if (!rsh.incNextCallSeq(callSeq)) {
3223        throw new OutOfOrderScannerNextException(
3224          "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: "
3225            + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3226      }
3227    }
3228  }
3229
3230  private void addScannerLeaseBack(LeaseManager.Lease lease) {
3231    try {
3232      server.getLeaseManager().addLease(lease);
3233    } catch (LeaseStillHeldException e) {
3234      // should not happen as the scanner id is unique.
3235      throw new AssertionError(e);
3236    }
3237  }
3238
3239  // visible for testing only
3240  long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
3241    boolean allowHeartbeatMessages) {
3242    // Set the time limit to be half of the more restrictive timeout value (one of the
3243    // timeout values must be positive). In the event that both values are positive, the
3244    // more restrictive of the two is used to calculate the limit.
3245    if (allowHeartbeatMessages) {
3246      long now = EnvironmentEdgeManager.currentTime();
3247      long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
3248      if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
3249        long timeLimitDelta;
3250        if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
3251          timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
3252        } else {
3253          timeLimitDelta =
3254            scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
3255        }
3256
3257        // Use half of whichever timeout value was more restrictive... But don't allow
3258        // the time limit to be less than the allowable minimum (could cause an
3259        // immediate timeout before scanning any data).
3260        timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
3261        return now + timeLimitDelta;
3262      }
3263    }
3264    // Default value of timeLimit is negative to indicate no timeLimit should be
3265    // enforced.
3266    return -1L;
3267  }
3268
3269  private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
3270    long timeout;
3271    if (controller != null && controller.getCallTimeout() > 0) {
3272      timeout = controller.getCallTimeout();
3273    } else if (rpcTimeout > 0) {
3274      timeout = rpcTimeout;
3275    } else {
3276      return -1;
3277    }
3278    if (call != null) {
3279      timeout -= (now - call.getReceiveTime());
3280    }
3281    // getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
3282    // return minimum value here in that case so we count this in calculating the final delta.
3283    return Math.max(minimumScanTimeLimitDelta, timeout);
3284  }
3285
3286  private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
3287    ScannerContext scannerContext, ScanResponse.Builder builder) {
3288    if (numOfCompleteRows >= limitOfRows) {
3289      if (LOG.isTraceEnabled()) {
3290        LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows
3291          + " scannerContext: " + scannerContext);
3292      }
3293      builder.setMoreResults(false);
3294    }
3295  }
3296
3297  // return whether we have more results in region.
3298  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
3299    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3300    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
3301    HRegion region = rsh.r;
3302    RegionScanner scanner = rsh.s;
3303    long maxResultSize;
3304    if (scanner.getMaxResultSize() > 0) {
3305      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
3306    } else {
3307      maxResultSize = maxQuotaResultSize;
3308    }
3309    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
3310    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
3311    // arbitrary 32. TODO: keep record of general size of results being returned.
3312    ArrayList<Cell> values = new ArrayList<>(32);
3313    region.startRegionOperation(Operation.SCAN);
3314    long before = EnvironmentEdgeManager.currentTime();
3315    // Used to check if we've matched the row limit set on the Scan
3316    int numOfCompleteRows = 0;
3317    // Count of times we call nextRaw; can be > numOfCompleteRows.
3318    int numOfNextRawCalls = 0;
3319    try {
3320      int numOfResults = 0;
3321      synchronized (scanner) {
3322        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
3323        boolean clientHandlesPartials =
3324          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
3325        boolean clientHandlesHeartbeats =
3326          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
3327
3328        // On the server side we must ensure that the correct ordering of partial results is
3329        // returned to the client to allow them to properly reconstruct the partial results.
3330        // If the coprocessor host is adding to the result list, we cannot guarantee the
3331        // correct ordering of partial results and so we prevent partial results from being
3332        // formed.
3333        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
3334        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
3335        boolean moreRows = false;
3336
3337        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
3338        // certain time threshold on the server. When the time threshold is exceeded, the
3339        // server stops the scan and sends back whatever Results it has accumulated within
3340        // that time period (may be empty). Since heartbeat messages have the potential to
3341        // create partial Results (in the event that the timeout occurs in the middle of a
3342        // row), we must only generate heartbeat messages when the client can handle both
3343        // heartbeats AND partials
3344        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
3345
3346        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
3347
3348        final LimitScope sizeScope =
3349          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3350        final LimitScope timeScope =
3351          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
3352
3353        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3354
3355        // Configure with limits for this RPC. Set keep progress true since size progress
3356        // towards size limit should be kept between calls to nextRaw
3357        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
3358        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
3359        // of heap size occupied by cells(being read). Cell data means its key and value parts.
3360        // maxQuotaResultSize - max results just from server side configuration and quotas, without
3361        // user's specified max. We use this for evaluating limits based on blocks (not cells).
3362        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
3363        // cell or block size from maximum here so we adhere to total limits of request.
3364        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
3365        // have accumulated block bytes. If not, this will be 0 for block size.
3366        long maxCellSize = maxResultSize;
3367        long maxBlockSize = maxQuotaResultSize;
3368        if (rpcCall != null) {
3369          maxBlockSize -= rpcCall.getBlockBytesScanned();
3370          maxCellSize -= rpcCall.getResponseCellSize();
3371        }
3372
3373        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
3374        contextBuilder.setBatchLimit(scanner.getBatch());
3375        contextBuilder.setTimeLimit(timeScope, timeLimit);
3376        contextBuilder.setTrackMetrics(trackMetrics);
3377        ScannerContext scannerContext = contextBuilder.build();
3378        boolean limitReached = false;
3379        while (numOfResults < maxResults) {
3380          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
3381          // batch limit is a limit on the number of cells per Result. Thus, if progress is
3382          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
3383          // reset the batch progress between nextRaw invocations since we don't want the
3384          // batch progress from previous calls to affect future calls
3385          scannerContext.setBatchProgress(0);
3386          assert values.isEmpty();
3387
3388          // Collect values to be returned here
3389          moreRows = scanner.nextRaw(values, scannerContext);
3390          if (rpcCall == null) {
3391            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
3392            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
3393            // buffers.See more details in HBASE-26036.
3394            CellUtil.cloneIfNecessary(values);
3395          }
3396          numOfNextRawCalls++;
3397
3398          if (!values.isEmpty()) {
3399            if (limitOfRows > 0) {
3400              // First we need to check if the last result is partial and we have a row change. If
3401              // so then we need to increase the numOfCompleteRows.
3402              if (results.isEmpty()) {
3403                if (
3404                  rsh.rowOfLastPartialResult != null
3405                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
3406                ) {
3407                  numOfCompleteRows++;
3408                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3409                    builder);
3410                }
3411              } else {
3412                Result lastResult = results.get(results.size() - 1);
3413                if (
3414                  lastResult.mayHaveMoreCellsInRow()
3415                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
3416                ) {
3417                  numOfCompleteRows++;
3418                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
3419                    builder);
3420                }
3421              }
3422              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3423                break;
3424              }
3425            }
3426            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
3427            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3428            results.add(r);
3429            numOfResults++;
3430            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
3431              numOfCompleteRows++;
3432              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
3433              if (builder.hasMoreResults() && !builder.getMoreResults()) {
3434                break;
3435              }
3436            }
3437          } else if (!moreRows && !results.isEmpty()) {
3438            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
3439            // last result is false. Otherwise it's possible that: the first nextRaw returned
3440            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
3441            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
3442            // return with moreRows=false, which means moreResultsInRegion would be false, it will
3443            // be a contradictory state (HBASE-21206).
3444            int lastIdx = results.size() - 1;
3445            Result r = results.get(lastIdx);
3446            if (r.mayHaveMoreCellsInRow()) {
3447              results.set(lastIdx, ClientInternalHelper.createResult(
3448                ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false));
3449            }
3450          }
3451          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
3452          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
3453          boolean resultsLimitReached = numOfResults >= maxResults;
3454          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
3455
3456          if (limitReached || !moreRows) {
3457            // With block size limit, we may exceed size limit without collecting any results.
3458            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3459            // or cursor if results were collected, for example for cell size or heap size limits.
3460            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
3461            // We only want to mark a ScanResponse as a heartbeat message in the event that
3462            // there are more values to be read server side. If there aren't more values,
3463            // marking it as a heartbeat is wasteful because the client will need to issue
3464            // another ScanRequest only to realize that they already have all the values
3465            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3466              // Heartbeat messages occur when the time limit has been reached, or size limit has
3467              // been reached before collecting any results. This can happen for heavily filtered
3468              // scans which scan over too many blocks.
3469              builder.setHeartbeatMessage(true);
3470              if (rsh.needCursor) {
3471                Cell cursorCell = scannerContext.getLastPeekedCell();
3472                if (cursorCell != null) {
3473                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
3474                }
3475              }
3476            }
3477            break;
3478          }
3479          values.clear();
3480        }
3481        if (rpcCall != null) {
3482          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3483        }
3484        builder.setMoreResultsInRegion(moreRows);
3485        // Check to see if the client requested that we track metrics server side. If the
3486        // client requested metrics, retrieve the metrics from the scanner context.
3487        if (trackMetrics) {
3488          // rather than increment yet another counter in StoreScanner, just set the value here
3489          // from block size progress before writing into the response
3490          scannerContext.getMetrics().countOfBlockBytesScanned
3491            .set(scannerContext.getBlockSizeProgress());
3492          if (rpcCall != null) {
3493            scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
3494          }
3495          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3496          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3497          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3498
3499          for (Entry<String, Long> entry : metrics.entrySet()) {
3500            pairBuilder.setName(entry.getKey());
3501            pairBuilder.setValue(entry.getValue());
3502            metricBuilder.addMetrics(pairBuilder.build());
3503          }
3504
3505          builder.setScanMetrics(metricBuilder.build());
3506        }
3507      }
3508    } finally {
3509      region.closeRegionOperation();
3510      // Update serverside metrics, even on error.
3511      long end = EnvironmentEdgeManager.currentTime();
3512
3513      long responseCellSize = 0;
3514      long blockBytesScanned = 0;
3515      if (rpcCall != null) {
3516        responseCellSize = rpcCall.getResponseCellSize();
3517        blockBytesScanned = rpcCall.getBlockBytesScanned();
3518        rsh.updateBlockBytesScanned(blockBytesScanned);
3519      }
3520      region.getMetrics().updateScan();
3521      final MetricsRegionServer metricsRegionServer = server.getMetrics();
3522      if (metricsRegionServer != null) {
3523        metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
3524        metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
3525      }
3526    }
3527    // coprocessor postNext hook
3528    if (region.getCoprocessorHost() != null) {
3529      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
3530    }
3531  }
3532
3533  /**
3534   * Scan data in a table.
3535   * @param controller the RPC controller
3536   * @param request    the scan request
3537   */
3538  @Override
3539  public ScanResponse scan(final RpcController controller, final ScanRequest request)
3540    throws ServiceException {
3541    if (controller != null && !(controller instanceof HBaseRpcController)) {
3542      throw new UnsupportedOperationException(
3543        "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
3544    }
3545    if (!request.hasScannerId() && !request.hasScan()) {
3546      throw new ServiceException(
3547        new DoNotRetryIOException("Missing required input: scannerId or scan"));
3548    }
3549    try {
3550      checkOpen();
3551    } catch (IOException e) {
3552      if (request.hasScannerId()) {
3553        String scannerName = toScannerName(request.getScannerId());
3554        if (LOG.isDebugEnabled()) {
3555          LOG.debug(
3556            "Server shutting down and client tried to access missing scanner " + scannerName);
3557        }
3558        final LeaseManager leaseManager = server.getLeaseManager();
3559        if (leaseManager != null) {
3560          try {
3561            leaseManager.cancelLease(scannerName);
3562          } catch (LeaseException le) {
3563            // No problem, ignore
3564            if (LOG.isTraceEnabled()) {
3565              LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
3566            }
3567          }
3568        }
3569      }
3570      throw new ServiceException(e);
3571    }
3572    requestCount.increment();
3573    rpcScanRequestCount.increment();
3574    RegionScannerContext rsx;
3575    ScanResponse.Builder builder = ScanResponse.newBuilder();
3576    try {
3577      rsx = checkQuotaAndGetRegionScannerContext(request, builder);
3578    } catch (IOException e) {
3579      if (e == SCANNER_ALREADY_CLOSED) {
3580        // Now we will close scanner automatically if there are no more results for this region but
3581        // the old client will still send a close request to us. Just ignore it and return.
3582        return builder.build();
3583      }
3584      throw new ServiceException(e);
3585    }
3586    String scannerName = rsx.scannerName;
3587    RegionScannerHolder rsh = rsx.holder;
3588    OperationQuota quota = rsx.quota;
3589    if (rsh.fullRegionScan) {
3590      rpcFullScanRequestCount.increment();
3591    }
3592    HRegion region = rsh.r;
3593    LeaseManager.Lease lease;
3594    try {
3595      // Remove lease while its being processed in server; protects against case
3596      // where processing of request takes > lease expiration time. or null if none found.
3597      lease = server.getLeaseManager().removeLease(scannerName);
3598    } catch (LeaseException e) {
3599      throw new ServiceException(e);
3600    }
3601    if (request.hasRenew() && request.getRenew()) {
3602      // add back and return
3603      addScannerLeaseBack(lease);
3604      try {
3605        checkScanNextCallSeq(request, rsh);
3606      } catch (OutOfOrderScannerNextException e) {
3607        throw new ServiceException(e);
3608      }
3609      return builder.build();
3610    }
3611    try {
3612      checkScanNextCallSeq(request, rsh);
3613    } catch (OutOfOrderScannerNextException e) {
3614      addScannerLeaseBack(lease);
3615      throw new ServiceException(e);
3616    }
3617    // Now we have increased the next call sequence. If we give client an error, the retry will
3618    // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
3619    // and then client will try to open a new scanner.
3620    boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
3621    int rows; // this is scan.getCaching
3622    if (request.hasNumberOfRows()) {
3623      rows = request.getNumberOfRows();
3624    } else {
3625      rows = closeScanner ? 0 : 1;
3626    }
3627    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
3628    // now let's do the real scan.
3629    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
3630    RegionScanner scanner = rsh.s;
3631    // this is the limit of rows for this scan, if we the number of rows reach this value, we will
3632    // close the scanner.
3633    int limitOfRows;
3634    if (request.hasLimitOfRows()) {
3635      limitOfRows = request.getLimitOfRows();
3636    } else {
3637      limitOfRows = -1;
3638    }
3639    boolean scannerClosed = false;
3640    try {
3641      List<Result> results = new ArrayList<>(Math.min(rows, 512));
3642      if (rows > 0) {
3643        boolean done = false;
3644        // Call coprocessor. Get region info from scanner.
3645        if (region.getCoprocessorHost() != null) {
3646          Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
3647          if (!results.isEmpty()) {
3648            for (Result r : results) {
3649              // add cell size from CP results so we can track response size and update limits
3650              // when calling scan below if !done. We'll also have tracked block size if the CP
3651              // got results from hbase, since StoreScanner tracks that for all calls automatically.
3652              addSize(rpcCall, r);
3653            }
3654          }
3655          if (bypass != null && bypass.booleanValue()) {
3656            done = true;
3657          }
3658        }
3659        if (!done) {
3660          scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3661            results, builder, rpcCall);
3662        } else {
3663          builder.setMoreResultsInRegion(!results.isEmpty());
3664        }
3665      } else {
3666        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
3667        builder.setMoreResultsInRegion(true);
3668      }
3669
3670      quota.addScanResult(results);
3671      addResults(builder, results, (HBaseRpcController) controller,
3672        RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
3673        isClientCellBlockSupport(rpcCall));
3674      if (scanner.isFilterDone() && results.isEmpty()) {
3675        // If the scanner's filter - if any - is done with the scan
3676        // only set moreResults to false if the results is empty. This is used to keep compatible
3677        // with the old scan implementation where we just ignore the returned results if moreResults
3678        // is false. Can remove the isEmpty check after we get rid of the old implementation.
3679        builder.setMoreResults(false);
3680      }
3681      // Later we may close the scanner depending on this flag so here we need to make sure that we
3682      // have already set this flag.
3683      assert builder.hasMoreResultsInRegion();
3684      // we only set moreResults to false in the above code, so set it to true if we haven't set it
3685      // yet.
3686      if (!builder.hasMoreResults()) {
3687        builder.setMoreResults(true);
3688      }
3689      if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
3690        // Record the last cell of the last result if it is a partial result
3691        // We need this to calculate the complete rows we have returned to client as the
3692        // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
3693        // current row. We may filter out all the remaining cells for the current row and just
3694        // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
3695        // check for row change.
3696        Result lastResult = results.get(results.size() - 1);
3697        if (lastResult.mayHaveMoreCellsInRow()) {
3698          rsh.rowOfLastPartialResult = lastResult.getRow();
3699        } else {
3700          rsh.rowOfLastPartialResult = null;
3701        }
3702      }
3703      if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
3704        scannerClosed = true;
3705        closeScanner(region, scanner, scannerName, rpcCall, false);
3706      }
3707
3708      // There's no point returning to a timed out client. Throwing ensures scanner is closed
3709      if (rpcCall != null && EnvironmentEdgeManager.currentTime() > rpcCall.getDeadline()) {
3710        throw new TimeoutIOException("Client deadline exceeded, cannot return results");
3711      }
3712
3713      return builder.build();
3714    } catch (IOException e) {
3715      try {
3716        // scanner is closed here
3717        scannerClosed = true;
3718        // The scanner state might be left in a dirty state, so we will tell the Client to
3719        // fail this RPC and close the scanner while opening up another one from the start of
3720        // row that the client has last seen.
3721        closeScanner(region, scanner, scannerName, rpcCall, true);
3722
3723        // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
3724        // used in two different semantics.
3725        // (1) The first is to close the client scanner and bubble up the exception all the way
3726        // to the application. This is preferred when the exception is really un-recoverable
3727        // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
3728        // bucket usually.
3729        // (2) Second semantics is to close the current region scanner only, but continue the
3730        // client scanner by overriding the exception. This is usually UnknownScannerException,
3731        // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
3732        // application-level ClientScanner has to continue without bubbling up the exception to
3733        // the client. See ClientScanner code to see how it deals with these special exceptions.
3734        if (e instanceof DoNotRetryIOException) {
3735          throw e;
3736        }
3737
3738        // If it is a FileNotFoundException, wrap as a
3739        // DoNotRetryIOException. This can avoid the retry in ClientScanner.
3740        if (e instanceof FileNotFoundException) {
3741          throw new DoNotRetryIOException(e);
3742        }
3743
3744        // We closed the scanner already. Instead of throwing the IOException, and client
3745        // retrying with the same scannerId only to get USE on the next RPC, we directly throw
3746        // a special exception to save an RPC.
3747        if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
3748          // 1.4.0+ clients know how to handle
3749          throw new ScannerResetException("Scanner is closed on the server-side", e);
3750        } else {
3751          // older clients do not know about SRE. Just throw USE, which they will handle
3752          throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
3753            + " scanner state for clients older than 1.3.", e);
3754        }
3755      } catch (IOException ioe) {
3756        throw new ServiceException(ioe);
3757      }
3758    } finally {
3759      if (!scannerClosed) {
3760        // Adding resets expiration time on lease.
3761        // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
3762        if (rpcCall != null) {
3763          rpcCall.setCallBack(rsh.shippedCallback);
3764        } else {
3765          // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
3766          // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
3767          // back to regionserver's LeaseManager in rsh.shippedCallback.
3768          runShippedCallback(rsh);
3769        }
3770      }
3771      quota.close();
3772    }
3773  }
3774
3775  private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
3776    assert rsh.shippedCallback != null;
3777    try {
3778      rsh.shippedCallback.run();
3779    } catch (IOException ioe) {
3780      throw new ServiceException(ioe);
3781    }
3782  }
3783
3784  private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3785    RpcCallContext context, boolean isError) throws IOException {
3786    if (region.getCoprocessorHost() != null) {
3787      if (region.getCoprocessorHost().preScannerClose(scanner)) {
3788        // bypass the actual close.
3789        return;
3790      }
3791    }
3792    RegionScannerHolder rsh = scanners.remove(scannerName);
3793    if (rsh != null) {
3794      if (context != null) {
3795        context.setCallBack(rsh.closeCallBack);
3796      } else {
3797        rsh.s.close();
3798      }
3799      if (region.getCoprocessorHost() != null) {
3800        region.getCoprocessorHost().postScannerClose(scanner);
3801      }
3802      if (!isError) {
3803        closedScanners.put(scannerName, rsh.getNextCallSeq());
3804      }
3805    }
3806  }
3807
3808  @Override
3809  public CoprocessorServiceResponse execRegionServerService(RpcController controller,
3810    CoprocessorServiceRequest request) throws ServiceException {
3811    rpcPreCheck("execRegionServerService");
3812    return server.execRegionServerService(controller, request);
3813  }
3814
3815  @Override
3816  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
3817    GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
3818    try {
3819      final RegionServerSpaceQuotaManager manager = server.getRegionServerSpaceQuotaManager();
3820      final GetSpaceQuotaSnapshotsResponse.Builder builder =
3821        GetSpaceQuotaSnapshotsResponse.newBuilder();
3822      if (manager != null) {
3823        final Map<TableName, SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
3824        for (Entry<TableName, SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
3825          builder.addSnapshots(TableQuotaSnapshot.newBuilder()
3826            .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
3827            .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())).build());
3828        }
3829      }
3830      return builder.build();
3831    } catch (Exception e) {
3832      throw new ServiceException(e);
3833    }
3834  }
3835
3836  @Override
3837  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
3838    ClearRegionBlockCacheRequest request) throws ServiceException {
3839    try {
3840      rpcPreCheck("clearRegionBlockCache");
3841      ClearRegionBlockCacheResponse.Builder builder = ClearRegionBlockCacheResponse.newBuilder();
3842      CacheEvictionStatsBuilder stats = CacheEvictionStats.builder();
3843      server.getRegionServerCoprocessorHost().preClearRegionBlockCache();
3844      List<HRegion> regions = getRegions(request.getRegionList(), stats);
3845      for (HRegion region : regions) {
3846        try {
3847          stats = stats.append(this.server.clearRegionBlockCache(region));
3848        } catch (Exception e) {
3849          stats.addException(region.getRegionInfo().getRegionName(), e);
3850        }
3851      }
3852      stats.withMaxCacheSize(server.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
3853      server.getRegionServerCoprocessorHost().postClearRegionBlockCache(stats.build());
3854      return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
3855    } catch (IOException e) {
3856      throw new ServiceException(e);
3857    }
3858  }
3859
3860  private void executeOpenRegionProcedures(OpenRegionRequest request,
3861    Map<TableName, TableDescriptor> tdCache) {
3862    long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
3863    long initiatingMasterActiveTime =
3864      request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1;
3865    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3866      RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
3867      TableName tableName = regionInfo.getTable();
3868      TableDescriptor tableDesc = tdCache.get(tableName);
3869      if (tableDesc == null) {
3870        try {
3871          tableDesc = server.getTableDescriptors().get(regionInfo.getTable());
3872        } catch (IOException e) {
3873          // Here we do not fail the whole method since we also need deal with other
3874          // procedures, and we can not ignore this one, so we still schedule a
3875          // AssignRegionHandler and it will report back to master if we still can not get the
3876          // TableDescriptor.
3877          LOG.warn("Failed to get TableDescriptor of {}, will try again in the handler",
3878            regionInfo.getTable(), e);
3879        }
3880        if (tableDesc != null) {
3881          tdCache.put(tableName, tableDesc);
3882        }
3883      }
3884      if (regionOpenInfo.getFavoredNodesCount() > 0) {
3885        server.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
3886          regionOpenInfo.getFavoredNodesList());
3887      }
3888      long procId = regionOpenInfo.getOpenProcId();
3889      if (server.submitRegionProcedure(procId)) {
3890        server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId,
3891          tableDesc, masterSystemTime, initiatingMasterActiveTime));
3892      }
3893    }
3894  }
3895
3896  private void executeCloseRegionProcedures(CloseRegionRequest request) {
3897    String encodedName;
3898    long initiatingMasterActiveTime =
3899      request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1;
3900    try {
3901      encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3902    } catch (DoNotRetryIOException e) {
3903      throw new UncheckedIOException("Should not happen", e);
3904    }
3905    ServerName destination = request.hasDestinationServer()
3906      ? ProtobufUtil.toServerName(request.getDestinationServer())
3907      : null;
3908    long procId = request.getCloseProcId();
3909    boolean evictCache = request.getEvictCache();
3910    if (server.submitRegionProcedure(procId)) {
3911      server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId,
3912        false, destination, evictCache, initiatingMasterActiveTime));
3913    }
3914  }
3915
3916  private void executeProcedures(RemoteProcedureRequest request) {
3917    RSProcedureCallable callable;
3918    try {
3919      callable = Class.forName(request.getProcClass()).asSubclass(RSProcedureCallable.class)
3920        .getDeclaredConstructor().newInstance();
3921    } catch (Exception e) {
3922      LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
3923        request.getProcId(), e);
3924      server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(),
3925        e);
3926      return;
3927    }
3928    callable.init(request.getProcData().toByteArray(), server);
3929    LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
3930    server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable);
3931  }
3932
3933  @Override
3934  @QosPriority(priority = HConstants.ADMIN_QOS)
3935  public ExecuteProceduresResponse executeProcedures(RpcController controller,
3936    ExecuteProceduresRequest request) throws ServiceException {
3937    try {
3938      checkOpen();
3939      throwOnWrongStartCode(request);
3940      server.getRegionServerCoprocessorHost().preExecuteProcedures();
3941      if (request.getOpenRegionCount() > 0) {
3942        // Avoid reading from the TableDescritor every time(usually it will read from the file
3943        // system)
3944        Map<TableName, TableDescriptor> tdCache = new HashMap<>();
3945        request.getOpenRegionList().forEach(req -> executeOpenRegionProcedures(req, tdCache));
3946      }
3947      if (request.getCloseRegionCount() > 0) {
3948        request.getCloseRegionList().forEach(this::executeCloseRegionProcedures);
3949      }
3950      if (request.getProcCount() > 0) {
3951        request.getProcList().forEach(this::executeProcedures);
3952      }
3953      server.getRegionServerCoprocessorHost().postExecuteProcedures();
3954      return ExecuteProceduresResponse.getDefaultInstance();
3955    } catch (IOException e) {
3956      throw new ServiceException(e);
3957    }
3958  }
3959
3960  @Override
3961  public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
3962    GetAllBootstrapNodesRequest request) throws ServiceException {
3963    GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
3964    server.getBootstrapNodes()
3965      .forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
3966    return builder.build();
3967  }
3968
3969  private void setReloadableGuardrails(Configuration conf) {
3970    rowSizeWarnThreshold =
3971      conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
3972    rejectRowsWithSizeOverThreshold =
3973      conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
3974    maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
3975      HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
3976  }
3977
3978  @Override
3979  public void onConfigurationChange(Configuration conf) {
3980    super.onConfigurationChange(conf);
3981    setReloadableGuardrails(conf);
3982  }
3983
3984  @Override
3985  public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
3986    GetCachedFilesListRequest request) throws ServiceException {
3987    GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder();
3988    List<String> fullyCachedFiles = new ArrayList<>();
3989    server.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> {
3990      fullyCachedFiles.addAll(fcf.keySet());
3991    });
3992    return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
3993  }
3994
3995  RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request,
3996    ScanResponse.Builder builder) throws IOException {
3997    if (request.hasScannerId()) {
3998      // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3999      // for more details.
4000      long scannerId = request.getScannerId();
4001      builder.setScannerId(scannerId);
4002      String scannerName = toScannerName(scannerId);
4003      RegionScannerHolder rsh = getRegionScanner(request);
4004      OperationQuota quota =
4005        getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize,
4006          rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
4007      return new RegionScannerContext(scannerName, rsh, quota);
4008    }
4009
4010    HRegion region = getRegion(request.getRegion());
4011    OperationQuota quota =
4012      getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L);
4013    Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder);
4014    return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota);
4015  }
4016}